提交 87c65f3b 编写于 作者: L liuyao

Merge remote-tracking branch 'origin/3.0' into enh/triggerCheckPoint2

...@@ -168,3 +168,11 @@ All [scalar functions](../function/#scalar-functions) are available in stream pr ...@@ -168,3 +168,11 @@ All [scalar functions](../function/#scalar-functions) are available in stream pr
- [unique](../function/#unique) - [unique](../function/#unique)
- [mode](../function/#mode) - [mode](../function/#mode)
## Pause\Resume stream
1.pause stream
PAUSE STREAM [IF EXISTS] stream_name;
If "IF EXISTS" is not specified and the stream does not exist, an error will be reported; If "IF EXISTS" is specified and the stream does not exist, success is returned; If the stream exists, paused all stream tasks.
2.resume stream
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
If "IF EXISTS" is not specified and the stream does not exist, an error will be reported. If "IF EXISTS" is specified and the stream does not exist, success is returned; If the stream exists, all of the stream tasks will be resumed. If "IGNORE UntREATED" is specified, data written during the pause period of stream is ignored when resuming stream.
...@@ -249,3 +249,12 @@ T = 最新事件时间 - DELETE_MARK ...@@ -249,3 +249,12 @@ T = 最新事件时间 - DELETE_MARK
- [unique](../function/#unique) - [unique](../function/#unique)
- [mode](../function/#mode) - [mode](../function/#mode)
## 暂停、恢复流计算
1.流计算暂停计算任务
PAUSE STREAM [IF EXISTS] stream_name;
没有指定IF EXISTS,如果该stream不存在,则报错;如果存在,则暂停流计算。指定了IF EXISTS,如果该stream不存在,则返回成功;如果存在,则暂停流计算
2.流计算恢复计算任务
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
没有指定IF EXISTS,如果该stream不存在,则报错,如果存在,则恢复流计算;指定了IF EXISTS,如果stream不存在,则返回成功;如果存在,则恢复流计算。如果指定IGNORE UNTREATED,则恢复流计算时,忽略流计算暂停期间写入的数据。
...@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); ...@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData); const char* blockDecode(SSDataBlock* pBlock, const char* pData);
// for debug // for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr);
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
......
...@@ -231,6 +231,7 @@ typedef struct SField { ...@@ -231,6 +231,7 @@ typedef struct SField {
uint8_t type; uint8_t type;
int8_t flags; int8_t flags;
int32_t bytes; int32_t bytes;
char comment[TSDB_COL_COMMENT_LEN];
} SField; } SField;
typedef struct SRetention { typedef struct SRetention {
...@@ -309,6 +310,7 @@ struct SSchema { ...@@ -309,6 +310,7 @@ struct SSchema {
col_id_t colId; col_id_t colId;
int32_t bytes; int32_t bytes;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
char comment[TSDB_COL_COMMENT_LEN];
}; };
struct SSchema2 { struct SSchema2 {
...@@ -1163,6 +1165,9 @@ typedef struct { ...@@ -1163,6 +1165,9 @@ typedef struct {
int32_t vgId; int32_t vgId;
int8_t syncState; int8_t syncState;
int8_t syncRestore; int8_t syncRestore;
int64_t syncTerm;
int64_t roleTimeMs;
int64_t startTimeMs;
int8_t syncCanRead; int8_t syncCanRead;
int64_t cacheUsage; int64_t cacheUsage;
int64_t numOfTables; int64_t numOfTables;
...@@ -1179,9 +1184,9 @@ typedef struct { ...@@ -1179,9 +1184,9 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int8_t syncState; int8_t syncState;
int8_t syncRestore;
int64_t syncTerm; int64_t syncTerm;
int8_t syncRestore;
int64_t roleTimeMs; int64_t roleTimeMs;
} SMnodeLoad; } SMnodeLoad;
...@@ -2387,6 +2392,9 @@ typedef struct { ...@@ -2387,6 +2392,9 @@ typedef struct {
int8_t type; int8_t type;
int8_t flags; int8_t flags;
int32_t bytes; int32_t bytes;
bool hasColComment;
char* colComment;
int32_t colCommentLen;
// TSDB_ALTER_TABLE_DROP_COLUMN // TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int8_t colModType; int8_t colModType;
......
...@@ -130,25 +130,25 @@ ...@@ -130,25 +130,25 @@
#define TK_NK_EQ 112 #define TK_NK_EQ 112
#define TK_USING 113 #define TK_USING 113
#define TK_TAGS 114 #define TK_TAGS 114
#define TK_BOOL 115 #define TK_COMMENT 115
#define TK_TINYINT 116 #define TK_BOOL 116
#define TK_SMALLINT 117 #define TK_TINYINT 117
#define TK_INT 118 #define TK_SMALLINT 118
#define TK_INTEGER 119 #define TK_INT 119
#define TK_BIGINT 120 #define TK_INTEGER 120
#define TK_FLOAT 121 #define TK_BIGINT 121
#define TK_DOUBLE 122 #define TK_FLOAT 122
#define TK_BINARY 123 #define TK_DOUBLE 123
#define TK_NCHAR 124 #define TK_BINARY 124
#define TK_UNSIGNED 125 #define TK_NCHAR 125
#define TK_JSON 126 #define TK_UNSIGNED 126
#define TK_VARCHAR 127 #define TK_JSON 127
#define TK_MEDIUMBLOB 128 #define TK_VARCHAR 128
#define TK_BLOB 129 #define TK_MEDIUMBLOB 129
#define TK_VARBINARY 130 #define TK_BLOB 130
#define TK_GEOMETRY 131 #define TK_VARBINARY 131
#define TK_DECIMAL 132 #define TK_GEOMETRY 132
#define TK_COMMENT 133 #define TK_DECIMAL 133
#define TK_MAX_DELAY 134 #define TK_MAX_DELAY 134
#define TK_WATERMARK 135 #define TK_WATERMARK 135
#define TK_ROLLUP 136 #define TK_ROLLUP 136
...@@ -354,7 +354,6 @@ ...@@ -354,7 +354,6 @@
#define TK_VIEW 336 #define TK_VIEW 336
#define TK_WAL 337 #define TK_WAL 337
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602 #define TK_NK_ILLEGAL 602
......
...@@ -106,6 +106,8 @@ typedef struct SMCtbCursor { ...@@ -106,6 +106,8 @@ typedef struct SMCtbCursor {
void *pVal; void *pVal;
int kLen; int kLen;
int vLen; int vLen;
int8_t paused;
int lock;
} SMCtbCursor; } SMCtbCursor;
typedef struct SRowBuffPos { typedef struct SRowBuffPos {
...@@ -295,7 +297,9 @@ int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool ...@@ -295,7 +297,9 @@ int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/ */
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur);
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
} SStoreMeta; } SStoreMeta;
......
...@@ -23,10 +23,11 @@ extern "C" { ...@@ -23,10 +23,11 @@ extern "C" {
#include "query.h" #include "query.h"
#include "querynodes.h" #include "querynodes.h"
#define DESCRIBE_RESULT_COLS 4 #define DESCRIBE_RESULT_COLS 5
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE) #define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE) #define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE) #define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_COL_COMMENT_LEN (TSDB_COL_COMMENT_LEN)
#define SHOW_CREATE_DB_RESULT_COLS 2 #define SHOW_CREATE_DB_RESULT_COLS 2
#define SHOW_CREATE_DB_RESULT_FIELD1_LEN (TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE) #define SHOW_CREATE_DB_RESULT_FIELD1_LEN (TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE)
...@@ -155,7 +156,7 @@ typedef struct SColumnDefNode { ...@@ -155,7 +156,7 @@ typedef struct SColumnDefNode {
ENodeType type; ENodeType type;
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
SDataType dataType; SDataType dataType;
char comments[TSDB_TB_COMMENT_LEN]; char comments[TSDB_COL_COMMENT_LEN];
bool sma; bool sma;
} SColumnDefNode; } SColumnDefNode;
...@@ -214,6 +215,7 @@ typedef struct SAlterTableStmt { ...@@ -214,6 +215,7 @@ typedef struct SAlterTableStmt {
char newColName[TSDB_COL_NAME_LEN]; char newColName[TSDB_COL_NAME_LEN];
STableOptions* pOptions; STableOptions* pOptions;
SDataType dataType; SDataType dataType;
char colComment[TSDB_COL_COMMENT_LEN];
SValueNode* pVal; SValueNode* pVal;
} SAlterTableStmt; } SAlterTableStmt;
......
...@@ -241,6 +241,7 @@ typedef struct SSyncState { ...@@ -241,6 +241,7 @@ typedef struct SSyncState {
bool canRead; bool canRead;
SyncTerm term; SyncTerm term;
int64_t roleTimeMs; int64_t roleTimeMs;
int64_t startTimeMs;
} SSyncState; } SSyncState;
int32_t syncInit(); int32_t syncInit();
......
...@@ -230,6 +230,7 @@ typedef enum ELogicConditionType { ...@@ -230,6 +230,7 @@ typedef enum ELogicConditionType {
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025 #define TSDB_TB_COMMENT_LEN 1025
#define TSDB_COL_COMMENT_LEN 1025
#define TSDB_QUERY_ID_LEN 26 #define TSDB_QUERY_ID_LEN 26
#define TSDB_TRANS_OPER_LEN 16 #define TSDB_TRANS_OPER_LEN 16
......
...@@ -284,7 +284,6 @@ static const SSysDbTableSchema topicSchema[] = { ...@@ -284,7 +284,6 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };
static const SSysDbTableSchema subscriptionSchema[] = { static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
...@@ -295,12 +294,13 @@ static const SSysDbTableSchema subscriptionSchema[] = { ...@@ -295,12 +294,13 @@ static const SSysDbTableSchema subscriptionSchema[] = {
}; };
static const SSysDbTableSchema vnodesSchema[] = { static const SSysDbTableSchema vnodesSchema[] = {
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "dnode_ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
}; };
static const SSysDbTableSchema userUserPrivilegesSchema[] = { static const SSysDbTableSchema userUserPrivilegesSchema[] = {
......
...@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
} }
// for debug // for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
int32_t size = 2048 * 1024; int32_t size = 2048 * 1024;
*pDataBuf = taosMemoryCalloc(size, 1); *pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf; char* dumpBuf = *pDataBuf;
...@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64 "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n", "|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
......
...@@ -534,6 +534,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -534,6 +534,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1; if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
} }
for (int32_t i = 0; i < pReq->numOfTags; ++i) { for (int32_t i = 0; i < pReq->numOfTags; ++i) {
...@@ -542,6 +543,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -542,6 +543,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1; if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
} }
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) { for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
...@@ -608,6 +610,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -608,6 +610,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1; if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1; if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1; if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pColumns, &field) == NULL) { if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -620,6 +623,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -620,6 +623,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1; if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1; if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1; if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) { if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -1079,8 +1083,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1079,8 +1083,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1; if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
if (tEncodeI32(&encoder, reserved) < 0) return -1; if (tEncodeI32(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1;
} }
// mnode loads // mnode loads
...@@ -1104,6 +1108,16 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1104,6 +1108,16 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1; if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1; if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
if (tEncodeI8(&encoder, pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1; if (tEncodeI8(&encoder, pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
// vnode extra
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
int64_t reserved = 0;
if (tEncodeI64(&encoder, pload->syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1148,7 +1162,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1148,7 +1162,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
for (int32_t i = 0; i < vlen; ++i) { for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0}; SVnodeLoad vload = {0};
int64_t reserved64 = 0; vload.syncTerm = -1;
int32_t reserved32 = 0; int32_t reserved32 = 0;
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1; if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1; if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
...@@ -1162,14 +1176,15 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1162,14 +1176,15 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1; if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1; if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1; if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
if (tDecodeI64(&decoder, &reserved64) < 0) return -1; if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1;
if (tDecodeI64(&decoder, &reserved64) < 0) return -1; if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) { if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
} }
// mnode loads
if (tDecodeI8(&decoder, &pReq->mload.syncState) < 0) return -1; if (tDecodeI8(&decoder, &pReq->mload.syncState) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->mload.syncRestore) < 0) return -1; if (tDecodeI8(&decoder, &pReq->mload.syncRestore) < 0) return -1;
...@@ -1200,6 +1215,17 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1200,6 +1215,17 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI8(&decoder, &pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1; if (tDecodeI8(&decoder, &pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
} }
// vnode extra
if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pLoad = taosArrayGet(pReq->pVloads, i);
int64_t reserved = 0;
if (tDecodeI64(&decoder, &pLoad->syncTerm) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
...@@ -2301,7 +2327,7 @@ int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp) ...@@ -2301,7 +2327,7 @@ int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp)
} }
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns; int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols); pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) return -1; if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) { for (int32_t i = 0; i < totalCols; ++i) {
...@@ -3684,7 +3710,7 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) { ...@@ -3684,7 +3710,7 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns; int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
if (totalCols > 0) { if (totalCols > 0) {
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols); pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) return -1; if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) { for (int32_t i = 0; i < totalCols; ++i) {
......
...@@ -347,8 +347,11 @@ typedef struct { ...@@ -347,8 +347,11 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
ESyncState syncState; ESyncState syncState;
int64_t syncTerm;
bool syncRestore; bool syncRestore;
bool syncCanRead; bool syncCanRead;
int64_t roleTimeMs;
int64_t startTimeMs;
ESyncRole nodeRole; ESyncRole nodeRole;
} SVnodeGid; } SVnodeGid;
......
...@@ -424,6 +424,47 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S ...@@ -424,6 +424,47 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
return 0; return 0;
} }
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
bool stateChanged = false;
bool roleChanged = pGid->syncState != pVload->syncState ||
(pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
pGid->roleTimeMs != pVload->roleTimeMs;
if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
pGid->startTimeMs != pVload->startTimeMs) {
mInfo(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d, dnode:%d",
vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
pGid->syncState = pVload->syncState;
pGid->syncTerm = pVload->syncTerm;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
pGid->startTimeMs = pVload->startTimeMs;
pGid->roleTimeMs = pVload->roleTimeMs;
stateChanged = true;
}
return stateChanged;
}
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
bool stateChanged = false;
bool roleChanged = pObj->syncState != pMload->syncState ||
(pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
pObj->roleTimeMs != pMload->roleTimeMs;
if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
pObj->syncTerm, pMload->syncTerm);
pObj->syncState = pMload->syncState;
pObj->syncTerm = pMload->syncTerm;
pObj->syncRestore = pMload->syncRestore;
pObj->roleTimeMs = pMload->roleTimeMs;
stateChanged = true;
}
return stateChanged;
}
static int32_t mndProcessStatusReq(SRpcMsg *pReq) { static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStatusReq statusReq = {0}; SStatusReq statusReq = {0};
...@@ -496,26 +537,21 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -496,26 +537,21 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pVgroup->compStorage = pVload->compStorage; pVgroup->compStorage = pVload->compStorage;
pVgroup->pointsWritten = pVload->pointsWritten; pVgroup->pointsWritten = pVload->pointsWritten;
} }
bool roleChanged = false; bool stateChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
SVnodeGid *pGid = &pVgroup->vnodeGid[vg]; SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
if (pGid->dnodeId == statusReq.dnodeId) { if (pGid->dnodeId == statusReq.dnodeId) {
if (pGid->syncState != pVload->syncState || pGid->syncRestore != pVload->syncRestore || if (pVload->startTimeMs == 0) {
pGid->syncCanRead != pVload->syncCanRead) { pVload->startTimeMs = statusReq.rebootTime;
mInfo( }
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d " if (pVload->roleTimeMs == 0) {
"canRead:%d, dnode:%d", pVload->roleTimeMs = statusReq.rebootTime;
pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead,
syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead, pDnode->id);
pGid->syncState = pVload->syncState;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
roleChanged = true;
} }
stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
break; break;
} }
} }
if (roleChanged) { if (stateChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) { if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
...@@ -531,23 +567,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -531,23 +567,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id); SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) { if (pObj != NULL) {
bool roleChanged = pObj->syncState != statusReq.mload.syncState || if (statusReq.mload.roleTimeMs == 0) {
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm); statusReq.mload.roleTimeMs = statusReq.rebootTime;
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
if (roleChanged || restoreChanged) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
" to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
pObj->syncState = statusReq.mload.syncState;
pObj->syncRestore = statusReq.mload.syncRestore;
pObj->syncTerm = statusReq.mload.syncTerm;
} }
mndUpdateMnodeState(pObj, &statusReq.mload);
if (roleChanged) {
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
}
mndReleaseMnode(pMnode, pObj); mndReleaseMnode(pMnode, pObj);
} }
......
...@@ -187,7 +187,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) ...@@ -187,7 +187,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
bool roleChanged = false; bool stateChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
SVnodeGid *pGid = &pVgroup->vnodeGid[vg]; SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
if (pGid->dnodeId == dnodeId) { if (pGid->dnodeId == dnodeId) {
...@@ -199,13 +199,14 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) ...@@ -199,13 +199,14 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pGid->syncState = TAOS_SYNC_STATE_OFFLINE; pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
pGid->syncRestore = 0; pGid->syncRestore = 0;
pGid->syncCanRead = 0; pGid->syncCanRead = 0;
roleChanged = true; pGid->startTimeMs = 0;
stateChanged = true;
} }
break; break;
} }
} }
if (roleChanged) { if (stateChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) { if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs, mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
......
...@@ -807,7 +807,6 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -807,7 +807,6 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
ESdbStatus objStatus = 0; ESdbStatus objStatus = 0;
char *pWrite; char *pWrite;
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
int64_t dummyTimeMs = 0;
pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId); pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
if (pSelfObj == NULL) { if (pSelfObj == NULL) {
...@@ -858,16 +857,9 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -858,16 +857,9 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) { colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
// state of old term / no status report => use dummyTimeMs
if (pObj->syncTerm > pSelfObj->syncTerm) {
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
}
numOfRows++; numOfRows++;
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
......
...@@ -835,6 +835,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat ...@@ -835,6 +835,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pSchema->bytes = pField->bytes; pSchema->bytes = pField->bytes;
pSchema->flags = pField->flags; pSchema->flags = pField->flags;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId; pSchema->colId = pDst->nextColId;
pDst->nextColId++; pDst->nextColId++;
} }
...@@ -848,6 +849,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat ...@@ -848,6 +849,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
SSCHMEA_SET_IDX_ON(pSchema); SSCHMEA_SET_IDX_ON(pSchema);
} }
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId; pSchema->colId = pDst->nextColId;
pDst->nextColId++; pDst->nextColId++;
} }
......
...@@ -961,27 +961,24 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -961,27 +961,24 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0; int32_t numOfRows = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t cols = 0; int32_t cols = 0;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup); pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) { for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pGid = &pVgroup->vnodeGid[i];
SColumnInfoData *pColInfo = NULL; SColumnInfoData *pColInfo = NULL;
cols = 0; cols = 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);
char buf[20] = {0};
STR_TO_VARSTR(buf, syncStr(pVgid->syncState));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
// db_name
const char *dbname = mndGetDbStr(pVgroup->dbName); const char *dbname = mndGetDbStr(pVgroup->dbName);
char b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
if (dbname != NULL) { if (dbname != NULL) {
...@@ -992,20 +989,33 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -992,20 +989,33 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)b1, false); colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
// dnode is online?
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
if (pDnode == NULL) {
mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
break;
}
bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
char buf[20] = {0};
ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
STR_TO_VARSTR(buf, syncStr(syncState));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgid->dnodeId, false); colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
char b2[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
if (pDnode != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(b2, pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
} else {
STR_WITH_MAXSIZE_TO_VARSTR(b2, "NULL", TSDB_EP_LEN + VARSTR_HEADER_SIZE);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)b2, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pDnode);
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
......
...@@ -169,7 +169,9 @@ int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); ...@@ -169,7 +169,9 @@ int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta); int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock); SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock); int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid); SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseStbCursor(SMStbCursor* pStbCur); void metaCloseStbCursor(SMStbCursor* pStbCur);
......
...@@ -423,40 +423,75 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) { ...@@ -423,40 +423,75 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
pCtbCur->pMeta = pMeta; pCtbCur->pMeta = pMeta;
pCtbCur->suid = uid; pCtbCur->suid = uid;
if (lock) { pCtbCur->lock = lock;
metaRLock(pMeta); pCtbCur->paused = 1;
}
ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL); ret = metaResumeCtbCursor(pCtbCur, 1);
if (ret < 0) { if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur);
return NULL; return NULL;
} }
return pCtbCur;
}
// move to the suid void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
ctbIdxKey.suid = uid; if (pCtbCur) {
ctbIdxKey.uid = INT64_MIN; if (!pCtbCur->paused) {
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c); if (pCtbCur->pMeta && pCtbCur->lock) metaULock(pCtbCur->pMeta);
if (c > 0) { if (pCtbCur->pCur) {
tdbTbcMoveToNext(pCtbCur->pCur); tdbTbcClose(pCtbCur->pCur);
}
}
tdbFree(pCtbCur->pKey);
tdbFree(pCtbCur->pVal);
} }
taosMemoryFree(pCtbCur);
}
return pCtbCur; void metaPauseCtbCursor(SMCtbCursor* pCtbCur) {
if (!pCtbCur->paused) {
tdbTbcClose((TBC*)pCtbCur->pCur);
if (pCtbCur->lock) {
metaULock(pCtbCur->pMeta);
}
pCtbCur->paused = 1;
}
} }
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) { int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first) {
if (pCtbCur) { if (pCtbCur->paused) {
if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta); pCtbCur->paused = 0;
if (pCtbCur->pCur) {
tdbTbcClose(pCtbCur->pCur);
tdbFree(pCtbCur->pKey); if (pCtbCur->lock) {
tdbFree(pCtbCur->pVal); metaRLock(pCtbCur->pMeta);
}
int ret = 0;
ret = tdbTbcOpen(pCtbCur->pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
if (ret < 0) {
metaCloseCtbCursor(pCtbCur);
return -1;
} }
taosMemoryFree(pCtbCur); if (first) {
SCtbIdxKey ctbIdxKey;
// move to the suid
ctbIdxKey.suid = pCtbCur->suid;
ctbIdxKey.uid = INT64_MIN;
int c = 0;
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
if (c > 0) {
tdbTbcMoveToNext(pCtbCur->pCur);
}
} else {
int c = 0;
ret = tdbTbcMoveTo(pCtbCur->pCur, pCtbCur->pKey, pCtbCur->kLen, &c);
if (c < 0) {
tdbTbcMoveToPrev(pCtbCur->pCur);
} else {
tdbTbcMoveToNext(pCtbCur->pCur);
}
}
} }
return 0;
} }
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
...@@ -1414,7 +1449,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { ...@@ -1414,7 +1449,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
} }
taosHashCleanup(pSepecifiedUidMap); taosHashCleanup(pSepecifiedUidMap);
metaCloseCtbCursor(pCur, 1); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -414,7 +414,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -414,7 +414,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (k == 0) { if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
} }
if (IS_SET_NULL(pCol)) { if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
......
...@@ -98,6 +98,8 @@ void initMetadataAPI(SStoreMeta* pMeta) { ...@@ -98,6 +98,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache; pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
pMeta->openCtbCursor = metaOpenCtbCursor; pMeta->openCtbCursor = metaOpenCtbCursor;
pMeta->resumeCtbCursor = metaResumeCtbCursor;
pMeta->pauseCtbCursor = metaPauseCtbCursor;
pMeta->closeCtbCursor = metaCloseCtbCursor; pMeta->closeCtbCursor = metaCloseCtbCursor;
pMeta->ctbCursorNext = metaCtbCursorNext; pMeta->ctbCursorNext = metaCtbCursorNext;
} }
......
...@@ -216,7 +216,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { ...@@ -216,7 +216,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
cfgRsp.numOfTags = schemaTag.nCols; cfgRsp.numOfTags = schemaTag.nCols;
cfgRsp.numOfColumns = schema.nCols; cfgRsp.numOfColumns = schema.nCols;
cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags)); cfgRsp.pSchemas = (SSchema *)taosMemoryCalloc(cfgRsp.numOfColumns + cfgRsp.numOfTags, sizeof(SSchema));
memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols); memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
if (schemaTag.nCols) { if (schemaTag.nCols) {
...@@ -380,6 +380,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { ...@@ -380,6 +380,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode); pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = state.state; pLoad->syncState = state.state;
pLoad->syncRestore = state.restored; pLoad->syncRestore = state.restored;
pLoad->syncTerm = state.term;
pLoad->roleTimeMs = state.roleTimeMs;
pLoad->startTimeMs = state.startTimeMs;
pLoad->syncCanRead = state.canRead; pLoad->syncCanRead = state.canRead;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
...@@ -452,7 +455,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { ...@@ -452,7 +455,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
metaCloseCtbCursor(pCur, 1); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -473,7 +476,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) { ...@@ -473,7 +476,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
taosArrayPush(list, &id); taosArrayPush(list, &id);
} }
metaCloseCtbCursor(pCur, 1); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -536,7 +539,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { ...@@ -536,7 +539,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
++(*num); ++(*num);
} }
metaCloseCtbCursor(pCur, 0); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -78,6 +78,10 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { ...@@ -78,6 +78,10 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_NOTE_LEN, 4); infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_NOTE_LEN, 4);
code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
} }
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_COL_COMMENT_LEN, 5);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pOutput = pBlock; *pOutput = pBlock;
...@@ -99,7 +103,9 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, ...@@ -99,7 +103,9 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
SColumnInfoData* pCol3 = taosArrayGet(pBlock->pDataBlock, 2); SColumnInfoData* pCol3 = taosArrayGet(pBlock->pDataBlock, 2);
// Note // Note
SColumnInfoData* pCol4 = taosArrayGet(pBlock->pDataBlock, 3); SColumnInfoData* pCol4 = taosArrayGet(pBlock->pDataBlock, 3);
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0}; // Comment
SColumnInfoData* pCol5 = taosArrayGet(pBlock->pDataBlock, 4);
char buf[DESCRIBE_RESULT_COL_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) { if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
continue; continue;
...@@ -112,6 +118,8 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, ...@@ -112,6 +118,8 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false); colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false);
STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : ""); STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : "");
colDataSetVal(pCol4, pBlock->info.rows, buf, false); colDataSetVal(pCol4, pBlock->info.rows, buf, false);
STR_TO_VARSTR(buf, pMeta->schema[i].comment);
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
++(pBlock->info.rows); ++(pBlock->info.rows);
} }
if (pBlock->info.rows <= 0) { if (pBlock->info.rows <= 0) {
...@@ -456,14 +464,19 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) { ...@@ -456,14 +464,19 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i; SSchema* pSchema = pCfg->pSchemas + i;
char type[32]; char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name); sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) { if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) { } else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type); *len +=
sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
} }
} }
...@@ -471,14 +484,18 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) { ...@@ -471,14 +484,18 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) { for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i; SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
char type[32]; char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name); sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) { if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) { } else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
} }
} }
......
...@@ -117,7 +117,7 @@ void* tableListDestroy(STableListInfo* pTableListInfo); ...@@ -117,7 +117,7 @@ void* tableListDestroy(STableListInfo* pTableListInfo);
void tableListClear(STableListInfo* pTableListInfo); void tableListClear(STableListInfo* pTableListInfo);
int32_t tableListGetOutputGroups(const STableListInfo* pTableList); int32_t tableListGetOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList); bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
int32_t* num); int32_t* num);
...@@ -183,13 +183,17 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); ...@@ -183,13 +183,17 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode); int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2); int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI); int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI);
char* getStreamOpName(uint16_t opType);
void printDataBlock(SSDataBlock* pBlock, const char* flag); void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI); SStorageAPI* pStorageAPI);
#endif // TDENGINE_EXECUTIL_H #endif // TDENGINE_EXECUTIL_H
...@@ -21,11 +21,11 @@ extern "C" { ...@@ -21,11 +21,11 @@ extern "C" {
#include "os.h" #include "os.h"
#include "tcommon.h" #include "tcommon.h"
#include "theap.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tsort.h" #include "tsort.h"
#include "ttszip.h" #include "ttszip.h"
#include "tvariant.h" #include "tvariant.h"
#include "theap.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "executil.h" #include "executil.h"
...@@ -39,14 +39,14 @@ extern "C" { ...@@ -39,14 +39,14 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
#include "tmsg.h" #include "tmsg.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
//#include "tstream.h" // #include "tstream.h"
//#include "tstreamUpdate.h" // #include "tstreamUpdate.h"
#include "tlrucache.h" #include "tlrucache.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct STsdbReader STsdbReader; typedef struct STsdbReader STsdbReader;
typedef struct STqReader STqReader; typedef struct STqReader STqReader;
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) #define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) #define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
...@@ -208,7 +208,7 @@ typedef struct STableScanBase { ...@@ -208,7 +208,7 @@ typedef struct STableScanBase {
SLimitInfo limitInfo; SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists. // there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableListInfo; STableListInfo* pTableListInfo;
TsdReader readerAPI; TsdReader readerAPI;
} STableScanBase; } STableScanBase;
typedef struct STableScanInfo { typedef struct STableScanInfo {
...@@ -224,7 +224,7 @@ typedef struct STableScanInfo { ...@@ -224,7 +224,7 @@ typedef struct STableScanInfo {
int8_t assignBlockUid; int8_t assignBlockUid;
bool hasGroupByTag; bool hasGroupByTag;
bool countOnly; bool countOnly;
// TsdReader readerAPI; // TsdReader readerAPI;
} STableScanInfo; } STableScanInfo;
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
...@@ -258,21 +258,21 @@ typedef struct STagScanFilterContext { ...@@ -258,21 +258,21 @@ typedef struct STagScanFilterContext {
} STagScanFilterContext; } STagScanFilterContext;
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo* pCols; SColumnInfo* pCols;
SSDataBlock* pRes; SSDataBlock* pRes;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t curPos; int32_t curPos;
SLimitNode* pSlimit; SLimitNode* pSlimit;
SReadHandle readHandle; SReadHandle readHandle;
STableListInfo* pTableListInfo; STableListInfo* pTableListInfo;
uint64_t suid; uint64_t suid;
void* pCtbCursor; void* pCtbCursor;
SNode* pTagCond; SNode* pTagCond;
SNode* pTagIndexCond; SNode* pTagIndexCond;
STagScanFilterContext filterCtx; STagScanFilterContext filterCtx;
SArray* aUidTags; // SArray<STUidTagInfo> SArray* aUidTags; // SArray<STUidTagInfo>
SArray* aFilterIdxs; // SArray<int32_t> SArray* aFilterIdxs; // SArray<int32_t>
SStorageAPI* pStorageAPI; SStorageAPI* pStorageAPI;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -342,12 +342,12 @@ typedef struct SStreamScanInfo { ...@@ -342,12 +342,12 @@ typedef struct SStreamScanInfo {
SExprSupp tagCalSup; SExprSupp tagCalSup;
int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle; SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex; int32_t updateResIndex;
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
...@@ -434,7 +434,7 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -434,7 +434,7 @@ typedef struct SIntervalAggOperatorInfo {
int64_t limit; int64_t limit;
bool slimited; bool slimited;
int64_t slimit; int64_t slimit;
uint64_t curGroupId; // initialize to UINT64_MAX uint64_t curGroupId; // initialize to UINT64_MAX
uint64_t handledGroupNum; uint64_t handledGroupNum;
BoundedQueue* pBQ; BoundedQueue* pBQ;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
...@@ -450,7 +450,7 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { ...@@ -450,7 +450,7 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
typedef struct SOpCheckPointInfo { typedef struct SOpCheckPointInfo {
uint16_t checkPointId; uint16_t checkPointId;
SHashObj* children; // key:child id SHashObj* children; // key:child id
} SOpCheckPointInfo; } SOpCheckPointInfo;
typedef struct SStreamIntervalOperatorInfo { typedef struct SStreamIntervalOperatorInfo {
...@@ -472,10 +472,9 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -472,10 +472,9 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex; int32_t pullIndex;
SSDataBlock* pPullDataRes; SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren; SArray* pChildren;
int32_t numOfChild; int32_t numOfChild;
SStreamState* pState; // void SStreamState* pState; // void
SWinKey delKey; SWinKey delKey;
uint64_t numOfDatapack; uint64_t numOfDatapack;
SArray* pUpdated; SArray* pUpdated;
...@@ -526,7 +525,6 @@ typedef struct SStreamSessionAggOperatorInfo { ...@@ -526,7 +525,6 @@ typedef struct SStreamSessionAggOperatorInfo {
void* pDelIterator; void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved; bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
...@@ -612,8 +610,9 @@ typedef struct SStreamFillOperatorInfo { ...@@ -612,8 +610,9 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo); int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName,
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo); SExecTaskInfo* pTaskInfo);
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
...@@ -684,7 +683,8 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); ...@@ -684,7 +683,8 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
SStateStore* pStore);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName); uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
...@@ -696,8 +696,8 @@ bool groupbyTbname(SNodeList* pGroupList); ...@@ -696,8 +696,8 @@ bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI); int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI);
int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI); int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
...@@ -715,11 +715,17 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr ...@@ -715,11 +715,17 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr
void doClearBufferedBlocks(SStreamScanInfo* pInfo); void doClearBufferedBlocks(SStreamScanInfo* pInfo);
uint64_t calcGroupId(char* pData, int32_t len); uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator); void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator); void streamOpReloadState(struct SOperatorInfo* pOperator);
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
bool compareVal(const char* v, const SStateKeys* pKey);
int32_t encodeSTimeWindowAggSupp(void **buf, STimeWindowAggSupp* pTwAggSup); int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
void* decodeSTimeWindowAggSupp(void *buf, STimeWindowAggSupp* pTwAggSup); TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -218,7 +218,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -218,7 +218,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
pRes->info.id.groupId = getTableGroupId(pTableList, pRes->info.id.uid); pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
pInfo->indexOfBufferedRes += 1; pInfo->indexOfBufferedRes += 1;
return pRes; return pRes;
} else { } else {
......
...@@ -58,16 +58,6 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { ...@@ -58,16 +58,6 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->groupId = groupId; pRowSup->groupId = groupId;
} }
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
...@@ -250,7 +240,7 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu ...@@ -250,7 +240,7 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
} }
......
...@@ -1931,7 +1931,7 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu ...@@ -1931,7 +1931,7 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu
*type = pTableList->idInfo.tableType; *type = pTableList->idInfo.tableType;
} }
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL); ASSERT(pTableList->map != NULL && slot != NULL);
...@@ -2177,12 +2177,67 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -2177,12 +2177,67 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void printDataBlock(SSDataBlock* pBlock, const char* flag) { char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "stream scan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "project";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
return "stream fill";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "session single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return "session semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return "session final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return "state single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return "stream partitionby";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return "stream event";
}
return "";
}
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===%s: Block is Null or Empty", flag); qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return; return;
} }
char* pBuf = NULL; char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf)); qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr));
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL;
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
}
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
...@@ -1070,3 +1070,15 @@ void streamOpReloadState(SOperatorInfo* pOperator) { ...@@ -1070,3 +1070,15 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
} }
bool compareVal(const char* v, const SStateKeys* pKey) {
if (IS_VAR_DATA_TYPE(pKey->type)) {
if (varDataLen(v) != varDataLen(pKey->pData)) {
return false;
} else {
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
}
} else {
return memcmp(pKey->pData, v, pKey->bytes) == 0;
}
}
...@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) { (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes; return pInfo->pRes;
} }
} }
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator); doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes; return pInfo->pRes;
} }
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
...@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pInfo->pFillInfo->preRowKey = INT64_MIN; pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes; return pInfo->pRes;
} }
break; break;
} }
printDataBlock(pBlock, "stream fill recv"); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) { if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId; pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
...@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillSup->hasDelete = true; pInfo->pFillSup->hasDelete = true;
doDeleteFillResult(pOperator); doDeleteFillResult(pOperator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "stream fill delete"); printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes; return pInfo->pDelRes;
} }
continue; continue;
...@@ -1379,7 +1379,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1379,7 +1379,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
} }
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes; return pInfo->pRes;
} }
......
...@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo ...@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; } static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info; SStreamPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pDest = pInfo->binfo.pRes; SSDataBlock* pDest = pInfo->binfo.pRes;
...@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pDest->info.rows; pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0); ASSERT(pDest->info.rows > 0);
printDataBlock(pDest, "stream partitionby"); printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pDest; return pDest;
} }
...@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { ...@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
printDataBlock(pBlock, "stream partitionby recv"); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) { switch (pBlock->info.type) {
case STREAM_NORMAL: case STREAM_NORMAL:
case STREAM_PULL_DATA: case STREAM_PULL_DATA:
...@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { ...@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case STREAM_DELETE_DATA: { case STREAM_DELETE_DATA: {
copyDataBlock(pInfo->pDelRes, pBlock); copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pInfo->pDelRes, "stream partitionby delete"); printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes; return pInfo->pDelRes;
} break; } break;
case STREAM_CREATE_CHILD_TABLE: case STREAM_CREATE_CHILD_TABLE:
......
...@@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
}
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
......
...@@ -694,7 +694,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -694,7 +694,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
} }
if (pBlock->info.id.uid) { if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
} }
uint32_t status = 0; uint32_t status = 0;
...@@ -1089,7 +1089,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -1089,7 +1089,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
if (hasNext) { if (hasNext) {
/*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL); /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
} }
pAPI->tsdReader.tsdReaderClose(pReader); pAPI->tsdReader.tsdReaderClose(pReader);
...@@ -1111,7 +1111,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, ...@@ -1111,7 +1111,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid); return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
} }
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
...@@ -1344,7 +1344,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1344,7 +1344,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if (rows == 0) { if (rows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
...@@ -1361,7 +1361,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1361,7 +1361,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY startTs = srcStartTsCol[0]; TSKEY startTs = srcStartTsCol[0];
TSKEY endTs = srcEndTsCol[0]; TSKEY endTs = srcEndTsCol[0];
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver); SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
printDataBlock(pPreRes, "pre res"); printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo));
blockDataCleanup(pSrcBlock); blockDataCleanup(pSrcBlock);
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows); int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1376,7 +1376,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1376,7 +1376,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid, appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
&groupId, NULL); &groupId, NULL);
} }
printDataBlock(pSrcBlock, "new delete"); printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo));
} }
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
...@@ -1652,7 +1652,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1652,7 +1652,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pBlockInfo->version = pBlock->info.version; pBlockInfo->version = pBlock->info.version;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pBlockInfo->id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
// todo extract method // todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
...@@ -1942,38 +1942,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1942,38 +1942,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
switch (pInfo->scanMode) { switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: { case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
printDataBlock(pInfo->pRecoverRes, "scan recover"); printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes; return pInfo->pRecoverRes;
} break; } break;
// case STREAM_SCAN_FROM_UPDATERES: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// printDataBlock(pInfo->pUpdateRes, "recover update");
// return pInfo->pUpdateRes;
// } break;
// case STREAM_SCAN_FROM_DELETE_DATA: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
// return pInfo->pDeleteDataRes;
// } break;
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
// if (pSDB) {
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
// checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "scan recover update");
// calBlockTbName(pInfo, pSDB);
// return pSDB;
// }
// blockDataCleanup(pInfo->pUpdateDataRes);
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
// } break;
default: default:
break; break;
} }
...@@ -1982,22 +1953,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1982,22 +1953,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) { if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes); calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// } else {
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
// }
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl"); printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows); qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover"); printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes; return pInfo->pRecoverRes;
} }
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
...@@ -2053,7 +2019,7 @@ FETCH_NEXT_BLOCK: ...@@ -2053,7 +2019,7 @@ FETCH_NEXT_BLOCK:
pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break; } break;
case STREAM_DELETE_DATA: { case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv"); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL; SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) { if (pInfo->tqReader) {
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
...@@ -2064,7 +2030,7 @@ FETCH_NEXT_BLOCK: ...@@ -2064,7 +2030,7 @@ FETCH_NEXT_BLOCK:
setBlockGroupIdByUid(pInfo, pDelBlock); setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered"); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", GET_TASKID(pTaskInfo));
if (pDelBlock->info.rows == 0) { if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) { if (pInfo->tqReader) {
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
...@@ -2075,7 +2041,7 @@ FETCH_NEXT_BLOCK: ...@@ -2075,7 +2041,7 @@ FETCH_NEXT_BLOCK:
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pDelBlock, "stream scan delete result"); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
if (pInfo->pDeleteDataRes->info.rows > 0) { if (pInfo->pDeleteDataRes->info.rows > 0) {
...@@ -2090,7 +2056,7 @@ FETCH_NEXT_BLOCK: ...@@ -2090,7 +2056,7 @@ FETCH_NEXT_BLOCK:
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pDelBlock, "stream scan delete data"); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
if (pInfo->tqReader) { if (pInfo->tqReader) {
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
} }
...@@ -2108,7 +2074,7 @@ FETCH_NEXT_BLOCK: ...@@ -2108,7 +2074,7 @@ FETCH_NEXT_BLOCK:
default: default:
break; break;
} }
// printDataBlock(pBlock, "stream scan recv"); printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
...@@ -2144,7 +2110,7 @@ FETCH_NEXT_BLOCK: ...@@ -2144,7 +2110,7 @@ FETCH_NEXT_BLOCK:
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "stream scan update"); printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
calBlockTbName(pInfo, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }
...@@ -2839,11 +2805,6 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF ...@@ -2839,11 +2805,6 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) { static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
// if (pUidTagInfo->name != NULL) {
// STR_TO_VARSTR(str, pUidTagInfo->name);
// } else { // name is not retrieved during filter
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
// }
STR_TO_VARSTR(str, "ctbidx"); STR_TO_VARSTR(str, "ctbidx");
colDataSetVal(pColInfo, rowIndex, str, false); colDataSetVal(pColInfo, rowIndex, str, false);
...@@ -2912,12 +2873,14 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { ...@@ -2912,12 +2873,14 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
if (pInfo->pCtbCursor == NULL) { if (pInfo->pCtbCursor == NULL) {
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1); pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
} else {
pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0);
} }
SArray* aUidTags = pInfo->aUidTags; SArray* aUidTags = pInfo->aUidTags;
SArray* aFilterIdxs = pInfo->aFilterIdxs; SArray* aFilterIdxs = pInfo->aFilterIdxs;
int32_t count = 0; int32_t count = 0;
bool ctbCursorFinished = false;
while (1) { while (1) {
taosArrayClearEx(aUidTags, tagScanFreeUidTag); taosArrayClearEx(aUidTags, tagScanFreeUidTag);
taosArrayClear(aFilterIdxs); taosArrayClear(aFilterIdxs);
...@@ -2927,6 +2890,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { ...@@ -2927,6 +2890,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
SMCtbCursor* pCur = pInfo->pCtbCursor; SMCtbCursor* pCur = pInfo->pCtbCursor;
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor); tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
if (uid == 0) { if (uid == 0) {
ctbCursorFinished = true;
break; break;
} }
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
...@@ -2955,7 +2919,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { ...@@ -2955,7 +2919,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
break; break;
} }
} }
if (count > 0) {
pAPI->metaFn.pauseCtbCursor(pInfo->pCtbCursor);
}
if (count == 0 || ctbCursorFinished) {
pAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
pInfo->pCtbCursor = NULL;
setOperatorCompleted(pOperator);
}
pRes->info.rows = count; pRes->info.rows = count;
pOperator->resultInfo.totalRows += count; pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
...@@ -3020,7 +2992,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { ...@@ -3020,7 +2992,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param) { static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param; STagScanInfo* pInfo = (STagScanInfo*)param;
if (pInfo->pCtbCursor != NULL) { if (pInfo->pCtbCursor != NULL) {
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1); pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
} }
taosHashCleanup(pInfo->filterCtx.colHash); taosHashCleanup(pInfo->filterCtx.colHash);
taosArrayDestroy(pInfo->filterCtx.cInfoList); taosArrayDestroy(pInfo->filterCtx.cInfoList);
...@@ -3150,7 +3122,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { ...@@ -3150,7 +3122,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
continue; continue;
} }
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......
此差异已折叠。
...@@ -399,6 +399,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -399,6 +399,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_FIELD(pSubtable); CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(igLastNull); COPY_SCALAR_FIELD(igLastNull);
COPY_SCALAR_FIELD(groupOrderScan); COPY_SCALAR_FIELD(groupOrderScan);
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -143,6 +143,7 @@ SNode* addRangeClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pRange); ...@@ -143,6 +143,7 @@ SNode* addRangeClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pRange);
SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery); SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery);
SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill); SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill);
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable); SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags);
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight); SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight);
SDataType createDataType(uint8_t type); SDataType createDataType(uint8_t type);
...@@ -171,8 +172,7 @@ SNode* createDropTableClause(SAstCreateContext* pCxt, bool ignoreNotExists, SNod ...@@ -171,8 +172,7 @@ SNode* createDropTableClause(SAstCreateContext* pCxt, bool ignoreNotExists, SNod
SNode* createDropTableStmt(SAstCreateContext* pCxt, SNodeList* pTables); SNode* createDropTableStmt(SAstCreateContext* pCxt, SNodeList* pTables);
SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable); SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions); SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName, SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode);
SDataType dataType);
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName); SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName);
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pOldColName, SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pOldColName,
SToken* pNewColName); SToken* pNewColName);
......
...@@ -312,17 +312,17 @@ cmd ::= ALTER STABLE alter_table_clause(A). ...@@ -312,17 +312,17 @@ cmd ::= ALTER STABLE alter_table_clause(A).
alter_table_clause(A) ::= full_table_name(B) alter_table_options(C). { A = createAlterTableModifyOptions(pCxt, B, C); } alter_table_clause(A) ::= full_table_name(B) alter_table_options(C). { A = createAlterTableModifyOptions(pCxt, B, C); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) ADD COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, &C, D); } full_table_name(B) ADD COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, C); }
alter_table_clause(A) ::= full_table_name(B) DROP COLUMN column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_COLUMN, &C); } alter_table_clause(A) ::= full_table_name(B) DROP COLUMN column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_COLUMN, &C); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) MODIFY COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, &C, D); } full_table_name(B) MODIFY COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, C); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) RENAME COLUMN column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, &C, &D); } full_table_name(B) RENAME COLUMN column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, &C, &D); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) ADD TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, &C, D); } full_table_name(B) ADD TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, C); }
alter_table_clause(A) ::= full_table_name(B) DROP TAG column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_TAG, &C); } alter_table_clause(A) ::= full_table_name(B) DROP TAG column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_TAG, &C); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) MODIFY TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, &C, D); } full_table_name(B) MODIFY TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, C); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); } full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
...@@ -358,7 +358,7 @@ column_def_list(A) ::= column_def(B). ...@@ -358,7 +358,7 @@ column_def_list(A) ::= column_def(B).
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); } column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, &B, C, NULL); } column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, &B, C, NULL); }
//column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); } column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
%type type_name { SDataType } %type type_name { SDataType }
%destructor type_name { } %destructor type_name { }
...@@ -475,8 +475,8 @@ cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B). ...@@ -475,8 +475,8 @@ cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B).
cmd ::= SHOW TAGS FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, createIdentifierValueNode(pCxt, &B), createIdentifierValueNode(pCxt, &A), OP_TYPE_EQUAL); } cmd ::= SHOW TAGS FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, createIdentifierValueNode(pCxt, &B), createIdentifierValueNode(pCxt, &A), OP_TYPE_EQUAL); }
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); } cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); }
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, createIdentifierValueNode(pCxt, &A), createIdentifierValueNode(pCxt, &B), C); } cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, createIdentifierValueNode(pCxt, &A), createIdentifierValueNode(pCxt, &B), C); }
cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); } cmd ::= SHOW VNODES ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); } cmd ::= SHOW VNODES. { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, NULL); }
// show alive // show alive
cmd ::= SHOW db_name_cond_opt(A) ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, A, QUERY_NODE_SHOW_DB_ALIVE_STMT); } cmd ::= SHOW db_name_cond_opt(A) ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, A, QUERY_NODE_SHOW_DB_ALIVE_STMT); }
cmd ::= SHOW CLUSTER ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, NULL, QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT); } cmd ::= SHOW CLUSTER ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, NULL, QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT); }
...@@ -1009,10 +1009,11 @@ join_type(A) ::= INNER. ...@@ -1009,10 +1009,11 @@ join_type(A) ::= INNER.
/************************************************ query_specification *************************************************/ /************************************************ query_specification *************************************************/
query_specification(A) ::= query_specification(A) ::=
SELECT set_quantifier_opt(B) select_list(C) from_clause_opt(D) SELECT tag_mode_opt(M) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K) where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). { fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
A = createSelectStmt(pCxt, B, C, D); A = createSelectStmt(pCxt, B, C, D);
A = setSelectStmtTagMode(pCxt, A, M);
A = addWhereClause(pCxt, A, E); A = addWhereClause(pCxt, A, E);
A = addPartitionByClause(pCxt, A, F); A = addPartitionByClause(pCxt, A, F);
A = addWindowClauseClause(pCxt, A, G); A = addWindowClauseClause(pCxt, A, G);
...@@ -1023,6 +1024,11 @@ query_specification(A) ::= ...@@ -1023,6 +1024,11 @@ query_specification(A) ::=
A = addFillClause(pCxt, A, L); A = addFillClause(pCxt, A, L);
} }
%type tag_mode_opt { bool }
%destructor tag_mode_opt { }
tag_mode_opt(A) ::= . { A = false; }
tag_mode_opt(A) ::= TAGS. { A = true; }
%type set_quantifier_opt { bool } %type set_quantifier_opt { bool }
%destructor set_quantifier_opt { } %destructor set_quantifier_opt { }
set_quantifier_opt(A) ::= . { A = false; } set_quantifier_opt(A) ::= . { A = false; }
......
...@@ -852,6 +852,13 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr ...@@ -852,6 +852,13 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
return select; return select;
} }
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags) {
if (pStmt && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->tagScan = bSelectTags;
}
return pStmt;
}
static void setSubquery(SNode* pStmt) { static void setSubquery(SNode* pStmt) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->isSubquery = true; ((SSelectStmt*)pStmt)->isSubquery = true;
...@@ -1333,17 +1340,15 @@ SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, ...@@ -1333,17 +1340,15 @@ SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable,
return createAlterTableStmtFinalize(pRealTable, pStmt); return createAlterTableStmtFinalize(pRealTable, pStmt);
} }
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName, SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode) {
SDataType dataType) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkColumnName(pCxt, pColName)) { SColumnDefNode* pCol = (SColumnDefNode*)pColDefNode;
return NULL;
}
SAlterTableStmt* pStmt = (SAlterTableStmt*)nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT); SAlterTableStmt* pStmt = (SAlterTableStmt*)nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
pStmt->alterType = alterType; pStmt->alterType = alterType;
COPY_STRING_FORM_ID_TOKEN(pStmt->colName, pColName); strcpy(pStmt->colName, pCol->colName);
pStmt->dataType = dataType; strcpy(pStmt->colComment, pCol->comments);
pStmt->dataType = pCol->dataType;
return createAlterTableStmtFinalize(pRealTable, pStmt); return createAlterTableStmtFinalize(pRealTable, pStmt);
} }
......
...@@ -4697,6 +4697,7 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) { ...@@ -4697,6 +4697,7 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode; SColumnDefNode* pCol = (SColumnDefNode*)pNode;
SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)}; SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
strcpy(field.name, pCol->colName); strcpy(field.name, pCol->colName);
strcpy(field.comment, pCol->comments);
if (pCol->sma) { if (pCol->sma) {
field.flags |= COL_SMA_ON; field.flags |= COL_SMA_ON;
} }
...@@ -5044,6 +5045,7 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem ...@@ -5044,6 +5045,7 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem
pSchema->bytes = calcTypeBytes(pCol->dataType); pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->flags = flags; pSchema->flags = flags;
strcpy(pSchema->name, pCol->colName); strcpy(pSchema->name, pCol->colName);
strcpy(pSchema->comment, pCol->comments);
} }
typedef struct SSampleAstInfo { typedef struct SSampleAstInfo {
...@@ -7692,6 +7694,10 @@ static int32_t extractDescribeResultSchema(int32_t* numOfCols, SSchema** pSchema ...@@ -7692,6 +7694,10 @@ static int32_t extractDescribeResultSchema(int32_t* numOfCols, SSchema** pSchema
(*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN; (*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN;
strcpy((*pSchema)[3].name, "note"); strcpy((*pSchema)[3].name, "note");
(*pSchema)[4].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[4].bytes = DESCRIBE_RESULT_COL_COMMENT_LEN;
strcpy((*pSchema)[4].name, "comment");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -8067,8 +8073,6 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -8067,8 +8073,6 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (NULL != pShow->pDnodeId) { if (NULL != pShow->pDnodeId) {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", pShow->pDnodeId, &pStmt->pWhere); code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", pShow->pDnodeId, &pStmt->pWhere);
} else {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_ep", pShow->pDnodeEndpoint, &pStmt->pWhere);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -8874,6 +8878,15 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S ...@@ -8874,6 +8878,15 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
pReq->type = pStmt->dataType.type; pReq->type = pStmt->dataType.type;
pReq->flags = COL_SMA_ON; pReq->flags = COL_SMA_ON;
pReq->bytes = calcTypeBytes(pStmt->dataType); pReq->bytes = calcTypeBytes(pStmt->dataType);
if (pStmt->colComment[0]) {
pReq->colComment = taosStrdup(pStmt->colComment);
if (pReq->colComment == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colCommentLen = strlen(pReq->colComment);
} else {
pReq->colCommentLen = -1;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -8924,6 +8937,15 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt ...@@ -8924,6 +8937,15 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->colId = pSchema->colId; pReq->colId = pSchema->colId;
if (pStmt->colComment[0]) {
pReq->colComment = taosStrdup(pStmt->colComment);
if (pReq->colComment == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colCommentLen = strlen(pReq->colComment);
} else {
pReq->colCommentLen = -1;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
此差异已折叠。
...@@ -239,9 +239,9 @@ TEST_F(ParserShowToUseTest, showVgroups) { ...@@ -239,9 +239,9 @@ TEST_F(ParserShowToUseTest, showVgroups) {
TEST_F(ParserShowToUseTest, showVnodes) { TEST_F(ParserShowToUseTest, showVnodes) {
useDb("root", "test"); useDb("root", "test");
run("SHOW VNODES 1"); run("SHOW VNODES ON DNODE 1");
run("SHOW VNODES 'node1:7030'"); run("SHOW VNODES");
} }
TEST_F(ParserShowToUseTest, splitVgroup) { TEST_F(ParserShowToUseTest, splitVgroup) {
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "planInt.h" #include "planInt.h"
#include "filter.h"
#include "functionMgt.h" #include "functionMgt.h"
typedef struct SLogicPlanContext { typedef struct SLogicPlanContext {
...@@ -253,7 +253,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols ...@@ -253,7 +253,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return SCAN_TYPE_SYSTEM_TABLE; return SCAN_TYPE_SYSTEM_TABLE;
} }
if (tagScan) { if (tagScan && 0 == LIST_LENGTH(pScanCols) && 0 != LIST_LENGTH(pScanPseudoCols)) {
return SCAN_TYPE_TAG; return SCAN_TYPE_TAG;
} }
...@@ -344,6 +344,55 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT ...@@ -344,6 +344,55 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; } static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; }
static EDealRes tagScanNodeHasTbnameFunc(SNode* pNode, void* pContext) {
if (QUERY_NODE_FUNCTION == nodeType(pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pNode)->funcType ||
(QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static bool tagScanNodeListHasTbname(SNodeList* pCols) {
bool hasTbname = false;
nodesWalkExprs(pCols, tagScanNodeHasTbnameFunc, &hasTbname);
return hasTbname;
}
static bool tagScanNodeHasTbname(SNode* pKeys) {
bool hasTbname = false;
nodesWalkExpr(pKeys, tagScanNodeHasTbnameFunc, &hasTbname);
return hasTbname;
}
static int32_t tagScanSetExecutionMode(SScanLogicNode* pScan) {
pScan->onlyMetaCtbIdx = false;
if (tagScanNodeListHasTbname(pScan->pScanPseudoCols)) {
pScan->onlyMetaCtbIdx = false;
return TSDB_CODE_SUCCESS;
}
if (pScan->node.pConditions == NULL) {
pScan->onlyMetaCtbIdx = true;
return TSDB_CODE_SUCCESS;
}
SNode* pCond = nodesCloneNode(pScan->node.pConditions);
SNode* pTagCond = NULL;
SNode* pTagIndexCond = NULL;
filterPartitionCond(&pCond, NULL, &pTagIndexCond, &pTagCond, NULL);
if (pTagIndexCond || tagScanNodeHasTbname(pTagCond)) {
pScan->onlyMetaCtbIdx = false;
} else {
pScan->onlyMetaCtbIdx = true;
}
nodesDestroyNode(pCond);
nodesDestroyNode(pTagIndexCond);
nodesDestroyNode(pTagCond);
return TSDB_CODE_SUCCESS;
}
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable, static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable,
SLogicNode** pLogicNode) { SLogicNode** pLogicNode) {
SScanLogicNode* pScan = NULL; SScanLogicNode* pScan = NULL;
...@@ -411,6 +460,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -411,6 +460,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets); code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
} }
if (pScan->scanType == SCAN_TYPE_TAG) {
code = tagScanSetExecutionMode(pScan);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pScan; *pLogicNode = (SLogicNode*)pScan;
} else { } else {
......
...@@ -1563,7 +1563,8 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) { ...@@ -1563,7 +1563,8 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
static bool partTagsIsOptimizableNode(SLogicNode* pNode) { static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
bool ret = 1 == LIST_LENGTH(pNode->pChildren) && bool ret = 1 == LIST_LENGTH(pNode->pChildren) &&
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)); QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) &&
SCAN_TYPE_TAG != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->scanType;
if (!ret) return ret; if (!ret) return ret;
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_PARTITION: { case QUERY_NODE_LOGIC_PLAN_PARTITION: {
......
此差异已折叠。
...@@ -136,11 +136,11 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* ...@@ -136,11 +136,11 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char*
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) { if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
tryCount++; tryCount++;
taosMsleep(1); taosMsleep(1);
qDebug("===stream===try again batchSize:%d", numOfBlocks); qDebug("try again batchSize:%d", numOfBlocks);
continue; continue;
} }
qDebug("===stream===break batchSize:%d", numOfBlocks); qDebug("break batchSize:%d", numOfBlocks);
break; break;
} }
......
...@@ -509,6 +509,7 @@ SSyncState syncGetState(int64_t rid) { ...@@ -509,6 +509,7 @@ SSyncState syncGetState(int64_t rid) {
if (pSyncNode != NULL) { if (pSyncNode != NULL) {
state.state = pSyncNode->state; state.state = pSyncNode->state;
state.roleTimeMs = pSyncNode->roleTimeMs; state.roleTimeMs = pSyncNode->roleTimeMs;
state.startTimeMs = pSyncNode->startTime;
state.restored = pSyncNode->restoreFinish; state.restored = pSyncNode->restoreFinish;
if (pSyncNode->vgId != 1) { if (pSyncNode->vgId != 1) {
state.canRead = syncNodeIsReadyForRead(pSyncNode); state.canRead = syncNodeIsReadyForRead(pSyncNode);
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册