未验证 提交 e393a98e 编写于 作者: D dm chen 提交者: GitHub

Merge branch '3.0' into feat/TD-22970

......@@ -168,3 +168,11 @@ All [scalar functions](../function/#scalar-functions) are available in stream pr
- [unique](../function/#unique)
- [mode](../function/#mode)
## Pause\Resume stream
1.pause stream
PAUSE STREAM [IF EXISTS] stream_name;
If "IF EXISTS" is not specified and the stream does not exist, an error will be reported; If "IF EXISTS" is specified and the stream does not exist, success is returned; If the stream exists, paused all stream tasks.
2.resume stream
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
If "IF EXISTS" is not specified and the stream does not exist, an error will be reported. If "IF EXISTS" is specified and the stream does not exist, success is returned; If the stream exists, all of the stream tasks will be resumed. If "IGNORE UntREATED" is specified, data written during the pause period of stream is ignored when resuming stream.
......@@ -249,3 +249,12 @@ T = 最新事件时间 - DELETE_MARK
- [unique](../function/#unique)
- [mode](../function/#mode)
## 暂停、恢复流计算
1.流计算暂停计算任务
PAUSE STREAM [IF EXISTS] stream_name;
没有指定IF EXISTS,如果该stream不存在,则报错;如果存在,则暂停流计算。指定了IF EXISTS,如果该stream不存在,则返回成功;如果存在,则暂停流计算
2.流计算恢复计算任务
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
没有指定IF EXISTS,如果该stream不存在,则报错,如果存在,则恢复流计算;指定了IF EXISTS,如果stream不存在,则返回成功;如果存在,则恢复流计算。如果指定IGNORE UNTREATED,则恢复流计算时,忽略流计算暂停期间写入的数据。
......@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr);
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid);
......
......@@ -232,6 +232,7 @@ typedef struct SField {
uint8_t type;
int8_t flags;
int32_t bytes;
char comment[TSDB_COL_COMMENT_LEN];
} SField;
typedef struct SRetention {
......@@ -310,6 +311,7 @@ struct SSchema {
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
char comment[TSDB_COL_COMMENT_LEN];
};
struct SSchema2 {
......@@ -1164,6 +1166,9 @@ typedef struct {
int32_t vgId;
int8_t syncState;
int8_t syncRestore;
int64_t syncTerm;
int64_t roleTimeMs;
int64_t startTimeMs;
int8_t syncCanRead;
int64_t cacheUsage;
int64_t numOfTables;
......@@ -1181,9 +1186,9 @@ typedef struct {
} SVnodeLoad;
typedef struct {
int8_t syncState;
int8_t syncRestore;
int8_t syncState;
int64_t syncTerm;
int8_t syncRestore;
int64_t roleTimeMs;
} SMnodeLoad;
......@@ -2392,6 +2397,9 @@ typedef struct {
int8_t type;
int8_t flags;
int32_t bytes;
bool hasColComment;
char* colComment;
int32_t colCommentLen;
// TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int8_t colModType;
......
......@@ -130,25 +130,25 @@
#define TK_NK_EQ 112
#define TK_USING 113
#define TK_TAGS 114
#define TK_BOOL 115
#define TK_TINYINT 116
#define TK_SMALLINT 117
#define TK_INT 118
#define TK_INTEGER 119
#define TK_BIGINT 120
#define TK_FLOAT 121
#define TK_DOUBLE 122
#define TK_BINARY 123
#define TK_NCHAR 124
#define TK_UNSIGNED 125
#define TK_JSON 126
#define TK_VARCHAR 127
#define TK_MEDIUMBLOB 128
#define TK_BLOB 129
#define TK_VARBINARY 130
#define TK_GEOMETRY 131
#define TK_DECIMAL 132
#define TK_COMMENT 133
#define TK_COMMENT 115
#define TK_BOOL 116
#define TK_TINYINT 117
#define TK_SMALLINT 118
#define TK_INT 119
#define TK_INTEGER 120
#define TK_BIGINT 121
#define TK_FLOAT 122
#define TK_DOUBLE 123
#define TK_BINARY 124
#define TK_NCHAR 125
#define TK_UNSIGNED 126
#define TK_JSON 127
#define TK_VARCHAR 128
#define TK_MEDIUMBLOB 129
#define TK_BLOB 130
#define TK_VARBINARY 131
#define TK_GEOMETRY 132
#define TK_DECIMAL 133
#define TK_MAX_DELAY 134
#define TK_WATERMARK 135
#define TK_ROLLUP 136
......@@ -354,7 +354,6 @@
#define TK_VIEW 336
#define TK_WAL 337
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602
......
......@@ -106,6 +106,8 @@ typedef struct SMCtbCursor {
void *pVal;
int kLen;
int vLen;
int8_t paused;
int lock;
} SMCtbCursor;
typedef struct SRowBuffPos {
......@@ -295,7 +297,9 @@ int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur);
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
} SStoreMeta;
......
......@@ -23,10 +23,11 @@ extern "C" {
#include "query.h"
#include "querynodes.h"
#define DESCRIBE_RESULT_COLS 4
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_COLS 5
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_COL_COMMENT_LEN (TSDB_COL_COMMENT_LEN)
#define SHOW_CREATE_DB_RESULT_COLS 2
#define SHOW_CREATE_DB_RESULT_FIELD1_LEN (TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE)
......@@ -155,7 +156,7 @@ typedef struct SColumnDefNode {
ENodeType type;
char colName[TSDB_COL_NAME_LEN];
SDataType dataType;
char comments[TSDB_TB_COMMENT_LEN];
char comments[TSDB_COL_COMMENT_LEN];
bool sma;
} SColumnDefNode;
......@@ -214,6 +215,7 @@ typedef struct SAlterTableStmt {
char newColName[TSDB_COL_NAME_LEN];
STableOptions* pOptions;
SDataType dataType;
char colComment[TSDB_COL_COMMENT_LEN];
SValueNode* pVal;
} SAlterTableStmt;
......
......@@ -243,6 +243,7 @@ typedef struct SSyncState {
int32_t progress;
SyncTerm term;
int64_t roleTimeMs;
int64_t startTimeMs;
} SSyncState;
int32_t syncInit();
......
......@@ -230,6 +230,7 @@ typedef enum ELogicConditionType {
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025
#define TSDB_COL_COMMENT_LEN 1025
#define TSDB_QUERY_ID_LEN 26
#define TSDB_TRANS_OPER_LEN 16
......
......@@ -284,7 +284,6 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
};
static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
......@@ -295,12 +294,13 @@ static const SSysDbTableSchema subscriptionSchema[] = {
};
static const SSysDbTableSchema vnodesSchema[] = {
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "dnode_ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
};
static const SSysDbTableSchema userUserPrivilegesSchema[] = {
......
......@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
int32_t size = 2048 * 1024;
*pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf;
......@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
"|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
"%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
if (len >= size - 1) return dumpBuf;
......
......@@ -534,6 +534,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
......@@ -542,6 +543,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
......@@ -608,6 +610,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -620,6 +623,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -1079,8 +1083,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
if (tEncodeI32(&encoder, pload->learnerProgress) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1;
if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1;
}
// mnode loads
......@@ -1104,6 +1108,16 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
if (tEncodeI8(&encoder, pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
// vnode extra
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
int64_t reserved = 0;
if (tEncodeI64(&encoder, pload->syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1148,7 +1162,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0};
int64_t reserved = 0;
vload.syncTerm = -1;
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1;
......@@ -1161,14 +1176,15 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, &vload.learnerProgress) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1;
if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
// mnode loads
if (tDecodeI8(&decoder, &pReq->mload.syncState) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->mload.syncRestore) < 0) return -1;
......@@ -1199,6 +1215,17 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI8(&decoder, &pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
}
// vnode extra
if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pLoad = taosArrayGet(pReq->pVloads, i);
int64_t reserved = 0;
if (tDecodeI64(&decoder, &pLoad->syncTerm) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......@@ -2300,7 +2327,7 @@ int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp)
}
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
......@@ -3683,7 +3710,7 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
if (totalCols > 0) {
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
......
......@@ -347,8 +347,11 @@ typedef struct {
typedef struct {
int32_t dnodeId;
ESyncState syncState;
int64_t syncTerm;
bool syncRestore;
bool syncCanRead;
int64_t roleTimeMs;
int64_t startTimeMs;
ESyncRole nodeRole;
int32_t learnerProgress;
} SVnodeGid;
......
......@@ -424,6 +424,47 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
return 0;
}
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
bool stateChanged = false;
bool roleChanged = pGid->syncState != pVload->syncState ||
(pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
pGid->roleTimeMs != pVload->roleTimeMs;
if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
pGid->startTimeMs != pVload->startTimeMs) {
mInfo(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d, dnode:%d",
vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
pGid->syncState = pVload->syncState;
pGid->syncTerm = pVload->syncTerm;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
pGid->startTimeMs = pVload->startTimeMs;
pGid->roleTimeMs = pVload->roleTimeMs;
stateChanged = true;
}
return stateChanged;
}
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
bool stateChanged = false;
bool roleChanged = pObj->syncState != pMload->syncState ||
(pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
pObj->roleTimeMs != pMload->roleTimeMs;
if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
pObj->syncTerm, pMload->syncTerm);
pObj->syncState = pMload->syncState;
pObj->syncTerm = pMload->syncTerm;
pObj->syncRestore = pMload->syncRestore;
pObj->roleTimeMs = pMload->roleTimeMs;
stateChanged = true;
}
return stateChanged;
}
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStatusReq statusReq = {0};
......@@ -496,27 +537,21 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pVgroup->compStorage = pVload->compStorage;
pVgroup->pointsWritten = pVload->pointsWritten;
}
bool roleChanged = false;
bool stateChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
if (pGid->dnodeId == statusReq.dnodeId) {
if (pGid->syncState != pVload->syncState || pGid->syncRestore != pVload->syncRestore ||
pGid->syncCanRead != pVload->syncCanRead) {
mInfo(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d, dnode:%d",
pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead,
syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead, pDnode->id);
pGid->syncState = pVload->syncState;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
pGid->learnerProgress = pVload->learnerProgress;
roleChanged = true;
if (pVload->startTimeMs == 0) {
pVload->startTimeMs = statusReq.rebootTime;
}
if (pVload->roleTimeMs == 0) {
pVload->roleTimeMs = statusReq.rebootTime;
}
stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
break;
}
}
if (roleChanged) {
if (stateChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
......@@ -532,23 +567,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) {
bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
if (roleChanged || restoreChanged) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
" to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
pObj->syncState = statusReq.mload.syncState;
pObj->syncRestore = statusReq.mload.syncRestore;
pObj->syncTerm = statusReq.mload.syncTerm;
if (statusReq.mload.roleTimeMs == 0) {
statusReq.mload.roleTimeMs = statusReq.rebootTime;
}
if (roleChanged) {
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
}
mndUpdateMnodeState(pObj, &statusReq.mload);
mndReleaseMnode(pMnode, pObj);
}
......
......@@ -185,7 +185,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
bool roleChanged = false;
bool stateChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
if (pGid->dnodeId == dnodeId) {
......@@ -197,13 +197,14 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
pGid->syncRestore = 0;
pGid->syncCanRead = 0;
roleChanged = true;
pGid->startTimeMs = 0;
stateChanged = true;
}
break;
}
}
if (roleChanged) {
if (stateChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
......
......@@ -807,7 +807,6 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
ESdbStatus objStatus = 0;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
int64_t dummyTimeMs = 0;
pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
if (pSelfObj == NULL) {
......@@ -858,16 +857,9 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
// state of old term / no status report => use dummyTimeMs
if (pObj->syncTerm > pSelfObj->syncTerm) {
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
numOfRows++;
sdbRelease(pSdb, pObj);
......
......@@ -835,6 +835,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pSchema->bytes = pField->bytes;
pSchema->flags = pField->flags;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId;
pDst->nextColId++;
}
......@@ -848,6 +849,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
SSCHMEA_SET_IDX_ON(pSchema);
}
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId;
pDst->nextColId++;
}
......
......@@ -1075,27 +1075,24 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
int32_t cols = 0;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
if (pShow->pIter == NULL) break;
for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SVnodeGid *pGid = &pVgroup->vnodeGid[i];
SColumnInfoData *pColInfo = NULL;
cols = 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
char buf[20] = {0};
STR_TO_VARSTR(buf, syncStr(pVgid->syncState));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
// db_name
const char *dbname = mndGetDbStr(pVgroup->dbName);
char b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
if (dbname != NULL) {
......@@ -1106,20 +1103,33 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
// dnode is online?
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
if (pDnode == NULL) {
mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
break;
}
bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
char buf[20] = {0};
ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
STR_TO_VARSTR(buf, syncStr(syncState));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgid->dnodeId, false);
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
char b2[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
if (pDnode != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(b2, pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
} else {
STR_WITH_MAXSIZE_TO_VARSTR(b2, "NULL", TSDB_EP_LEN + VARSTR_HEADER_SIZE);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
numOfRows++;
sdbRelease(pSdb, pDnode);
}
sdbRelease(pSdb, pVgroup);
......
......@@ -168,7 +168,9 @@ int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseStbCursor(SMStbCursor* pStbCur);
......
......@@ -423,40 +423,75 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
pCtbCur->pMeta = pMeta;
pCtbCur->suid = uid;
if (lock) {
metaRLock(pMeta);
}
pCtbCur->lock = lock;
pCtbCur->paused = 1;
ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
ret = metaResumeCtbCursor(pCtbCur, 1);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur);
return NULL;
}
return pCtbCur;
}
// move to the suid
ctbIdxKey.suid = uid;
ctbIdxKey.uid = INT64_MIN;
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
if (c > 0) {
tdbTbcMoveToNext(pCtbCur->pCur);
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
if (!pCtbCur->paused) {
if (pCtbCur->pMeta && pCtbCur->lock) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) {
tdbTbcClose(pCtbCur->pCur);
}
}
tdbFree(pCtbCur->pKey);
tdbFree(pCtbCur->pVal);
}
taosMemoryFree(pCtbCur);
}
return pCtbCur;
void metaPauseCtbCursor(SMCtbCursor* pCtbCur) {
if (!pCtbCur->paused) {
tdbTbcClose((TBC*)pCtbCur->pCur);
if (pCtbCur->lock) {
metaULock(pCtbCur->pMeta);
}
pCtbCur->paused = 1;
}
}
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
if (pCtbCur) {
if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) {
tdbTbcClose(pCtbCur->pCur);
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first) {
if (pCtbCur->paused) {
pCtbCur->paused = 0;
tdbFree(pCtbCur->pKey);
tdbFree(pCtbCur->pVal);
if (pCtbCur->lock) {
metaRLock(pCtbCur->pMeta);
}
int ret = 0;
ret = tdbTbcOpen(pCtbCur->pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
if (ret < 0) {
metaCloseCtbCursor(pCtbCur);
return -1;
}
taosMemoryFree(pCtbCur);
if (first) {
SCtbIdxKey ctbIdxKey;
// move to the suid
ctbIdxKey.suid = pCtbCur->suid;
ctbIdxKey.uid = INT64_MIN;
int c = 0;
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
if (c > 0) {
tdbTbcMoveToNext(pCtbCur->pCur);
}
} else {
int c = 0;
ret = tdbTbcMoveTo(pCtbCur->pCur, pCtbCur->pKey, pCtbCur->kLen, &c);
if (c < 0) {
tdbTbcMoveToPrev(pCtbCur->pCur);
} else {
tdbTbcMoveToNext(pCtbCur->pCur);
}
}
}
return 0;
}
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
......@@ -1414,7 +1449,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
}
taosHashCleanup(pSepecifiedUidMap);
metaCloseCtbCursor(pCur, 1);
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
......
......@@ -287,6 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
// downstream task has blocked the output, stopped for a while
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
*pScanIdle = false;
// seek the stored version and extract data from WAL
......
......@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
......
......@@ -98,6 +98,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
pMeta->openCtbCursor = metaOpenCtbCursor;
pMeta->resumeCtbCursor = metaResumeCtbCursor;
pMeta->pauseCtbCursor = metaPauseCtbCursor;
pMeta->closeCtbCursor = metaCloseCtbCursor;
pMeta->ctbCursorNext = metaCtbCursorNext;
}
......
......@@ -216,7 +216,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
cfgRsp.numOfTags = schemaTag.nCols;
cfgRsp.numOfColumns = schema.nCols;
cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
cfgRsp.pSchemas = (SSchema *)taosMemoryCalloc(cfgRsp.numOfColumns + cfgRsp.numOfTags, sizeof(SSchema));
memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
if (schemaTag.nCols) {
......@@ -380,6 +380,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;
pLoad->syncTerm = state.term;
pLoad->roleTimeMs = state.roleTimeMs;
pLoad->startTimeMs = state.startTimeMs;
pLoad->syncCanRead = state.canRead;
pLoad->learnerProgress = state.progress;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
......@@ -453,7 +456,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
taosArrayPush(list, &info);
}
metaCloseCtbCursor(pCur, 1);
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
......@@ -474,7 +477,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
taosArrayPush(list, &id);
}
metaCloseCtbCursor(pCur, 1);
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
......@@ -537,7 +540,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
++(*num);
}
metaCloseCtbCursor(pCur, 0);
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
......
......@@ -78,6 +78,10 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_NOTE_LEN, 4);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_COL_COMMENT_LEN, 5);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pBlock;
......@@ -99,7 +103,9 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
SColumnInfoData* pCol3 = taosArrayGet(pBlock->pDataBlock, 2);
// Note
SColumnInfoData* pCol4 = taosArrayGet(pBlock->pDataBlock, 3);
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0};
// Comment
SColumnInfoData* pCol5 = taosArrayGet(pBlock->pDataBlock, 4);
char buf[DESCRIBE_RESULT_COL_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
for (int32_t i = 0; i < numOfRows; ++i) {
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
continue;
......@@ -112,6 +118,8 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false);
STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : "");
colDataSetVal(pCol4, pBlock->info.rows, buf, false);
STR_TO_VARSTR(buf, pMeta->schema[i].comment);
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
++(pBlock->info.rows);
}
if (pBlock->info.rows <= 0) {
......@@ -456,14 +464,19 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i;
char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
*len +=
sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
}
}
......@@ -471,14 +484,18 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
}
}
......
......@@ -117,7 +117,7 @@ void* tableListDestroy(STableListInfo* pTableListInfo);
void tableListClear(STableListInfo* pTableListInfo);
int32_t tableListGetOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
int32_t* num);
......@@ -183,13 +183,17 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI);
char* getStreamOpName(uint16_t opType);
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
#endif // TDENGINE_EXECUTIL_H
......@@ -470,7 +470,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren;
int32_t numOfChild;
SStreamState* pState; // void
......@@ -521,7 +520,6 @@ typedef struct SStreamSessionAggOperatorInfo {
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
......@@ -709,6 +707,13 @@ uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator);
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
bool compareVal(const char* v, const SStateKeys* pKey);
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
#ifdef __cplusplus
}
#endif
......
......@@ -218,7 +218,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL;
}
pRes->info.id.groupId = getTableGroupId(pTableList, pRes->info.id.uid);
pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
pInfo->indexOfBufferedRes += 1;
return pRes;
} else {
......
......@@ -58,16 +58,6 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->groupId = groupId;
}
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) {
SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
......@@ -250,7 +240,7 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
}
......
......@@ -1931,7 +1931,7 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu
*type = pTableList->idInfo.tableType;
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL);
......@@ -2177,12 +2177,67 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS;
}
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "stream scan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "project";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
return "stream fill";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "session single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return "session semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return "session final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return "state single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return "stream partitionby";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return "stream event";
}
return "";
}
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===%s: Block is Null or Empty", flag);
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL;
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
}
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
......@@ -1070,3 +1070,15 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
bool compareVal(const char* v, const SStateKeys* pKey) {
if (IS_VAR_DATA_TYPE(pKey->type)) {
if (varDataLen(v) != varDataLen(pKey->pData)) {
return false;
} else {
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
}
} else {
return memcmp(pKey->pData, v, pKey->bytes) == 0;
}
}
......@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
}
if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
setOperatorCompleted(pOperator);
......@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
break;
}
printDataBlock(pBlock, "stream fill recv");
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
......@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillSup->hasDelete = true;
doDeleteFillResult(pOperator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "stream fill delete");
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
continue;
......@@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
......
......@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pDest = pInfo->binfo.pRes;
......@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0);
printDataBlock(pDest, "stream partitionby");
printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pDest;
}
......@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
return NULL;
}
printDataBlock(pBlock, "stream partitionby recv");
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_PULL_DATA:
......@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case STREAM_DELETE_DATA: {
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pInfo->pDelRes, "stream partitionby delete");
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
} break;
default:
......
......@@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
}
return (p->info.rows > 0) ? p : NULL;
}
......
......@@ -693,7 +693,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
}
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
uint32_t status = 0;
......@@ -1088,7 +1088,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
if (hasNext) {
/*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
}
pAPI->tsdReader.tsdReaderClose(pReader);
......@@ -1110,7 +1110,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
}
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
......@@ -1343,7 +1343,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if (rows == 0) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
......@@ -1360,7 +1360,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY startTs = srcStartTsCol[0];
TSKEY endTs = srcEndTsCol[0];
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
printDataBlock(pPreRes, "pre res");
printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo));
blockDataCleanup(pSrcBlock);
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1375,7 +1375,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
&groupId, NULL);
}
printDataBlock(pSrcBlock, "new delete");
printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo));
}
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
......@@ -1651,7 +1651,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pBlockInfo->version = pBlock->info.version;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pBlockInfo->id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
......@@ -1921,38 +1921,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
printDataBlock(pInfo->pRecoverRes, "scan recover");
printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes;
} break;
// case STREAM_SCAN_FROM_UPDATERES: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// printDataBlock(pInfo->pUpdateRes, "recover update");
// return pInfo->pUpdateRes;
// } break;
// case STREAM_SCAN_FROM_DELETE_DATA: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
// return pInfo->pDeleteDataRes;
// } break;
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
// if (pSDB) {
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
// checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "scan recover update");
// calBlockTbName(pInfo, pSDB);
// return pSDB;
// }
// blockDataCleanup(pInfo->pUpdateDataRes);
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
// } break;
default:
break;
}
......@@ -1961,22 +1932,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// } else {
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
// }
}
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover");
printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes;
}
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
......@@ -2032,7 +1998,7 @@ FETCH_NEXT_BLOCK:
pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break;
case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv");
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) {
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
......@@ -2043,7 +2009,7 @@ FETCH_NEXT_BLOCK:
setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", GET_TASKID(pTaskInfo));
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
......@@ -2054,7 +2020,7 @@ FETCH_NEXT_BLOCK:
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pDelBlock, "stream scan delete result");
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
blockDataDestroy(pDelBlock);
if (pInfo->pDeleteDataRes->info.rows > 0) {
......@@ -2069,7 +2035,7 @@ FETCH_NEXT_BLOCK:
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pDelBlock, "stream scan delete data");
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
......@@ -2084,7 +2050,7 @@ FETCH_NEXT_BLOCK:
default:
break;
}
// printDataBlock(pBlock, "stream scan recv");
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
......@@ -2120,7 +2086,7 @@ FETCH_NEXT_BLOCK:
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "stream scan update");
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
calBlockTbName(pInfo, pSDB);
return pSDB;
}
......@@ -2795,11 +2761,6 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
// if (pUidTagInfo->name != NULL) {
// STR_TO_VARSTR(str, pUidTagInfo->name);
// } else { // name is not retrieved during filter
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
// }
STR_TO_VARSTR(str, "ctbidx");
colDataSetVal(pColInfo, rowIndex, str, false);
......@@ -2868,12 +2829,14 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
if (pInfo->pCtbCursor == NULL) {
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
} else {
pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0);
}
SArray* aUidTags = pInfo->aUidTags;
SArray* aFilterIdxs = pInfo->aFilterIdxs;
int32_t count = 0;
bool ctbCursorFinished = false;
while (1) {
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
taosArrayClear(aFilterIdxs);
......@@ -2883,6 +2846,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
SMCtbCursor* pCur = pInfo->pCtbCursor;
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
if (uid == 0) {
ctbCursorFinished = true;
break;
}
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
......@@ -2911,7 +2875,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
break;
}
}
if (count > 0) {
pAPI->metaFn.pauseCtbCursor(pInfo->pCtbCursor);
}
if (count == 0 || ctbCursorFinished) {
pAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
pInfo->pCtbCursor = NULL;
setOperatorCompleted(pOperator);
}
pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
......@@ -2976,7 +2948,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
if (pInfo->pCtbCursor != NULL) {
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
}
taosHashCleanup(pInfo->filterCtx.colHash);
taosArrayDestroy(pInfo->filterCtx.cInfoList);
......@@ -3106,7 +3078,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
continue;
}
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......
此差异已折叠。
......@@ -399,6 +399,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(igLastNull);
COPY_SCALAR_FIELD(groupOrderScan);
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
return TSDB_CODE_SUCCESS;
}
......
......@@ -143,6 +143,7 @@ SNode* addRangeClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pRange);
SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery);
SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill);
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags);
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight);
SDataType createDataType(uint8_t type);
......@@ -171,8 +172,7 @@ SNode* createDropTableClause(SAstCreateContext* pCxt, bool ignoreNotExists, SNod
SNode* createDropTableStmt(SAstCreateContext* pCxt, SNodeList* pTables);
SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName,
SDataType dataType);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode);
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName);
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pOldColName,
SToken* pNewColName);
......
......@@ -312,17 +312,17 @@ cmd ::= ALTER STABLE alter_table_clause(A).
alter_table_clause(A) ::= full_table_name(B) alter_table_options(C). { A = createAlterTableModifyOptions(pCxt, B, C); }
alter_table_clause(A) ::=
full_table_name(B) ADD COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, &C, D); }
full_table_name(B) ADD COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, C); }
alter_table_clause(A) ::= full_table_name(B) DROP COLUMN column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_COLUMN, &C); }
alter_table_clause(A) ::=
full_table_name(B) MODIFY COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, &C, D); }
full_table_name(B) MODIFY COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, C); }
alter_table_clause(A) ::=
full_table_name(B) RENAME COLUMN column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, &C, &D); }
alter_table_clause(A) ::=
full_table_name(B) ADD TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, &C, D); }
full_table_name(B) ADD TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, C); }
alter_table_clause(A) ::= full_table_name(B) DROP TAG column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_TAG, &C); }
alter_table_clause(A) ::=
full_table_name(B) MODIFY TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, &C, D); }
full_table_name(B) MODIFY TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, C); }
alter_table_clause(A) ::=
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
alter_table_clause(A) ::=
......@@ -358,7 +358,7 @@ column_def_list(A) ::= column_def(B).
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, &B, C, NULL); }
//column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
%type type_name { SDataType }
%destructor type_name { }
......@@ -475,8 +475,8 @@ cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B).
cmd ::= SHOW TAGS FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, createIdentifierValueNode(pCxt, &B), createIdentifierValueNode(pCxt, &A), OP_TYPE_EQUAL); }
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); }
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, createIdentifierValueNode(pCxt, &A), createIdentifierValueNode(pCxt, &B), C); }
cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); }
cmd ::= SHOW VNODES ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
cmd ::= SHOW VNODES. { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, NULL); }
// show alive
cmd ::= SHOW db_name_cond_opt(A) ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, A, QUERY_NODE_SHOW_DB_ALIVE_STMT); }
cmd ::= SHOW CLUSTER ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, NULL, QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT); }
......@@ -1009,10 +1009,11 @@ join_type(A) ::= INNER.
/************************************************ query_specification *************************************************/
query_specification(A) ::=
SELECT set_quantifier_opt(B) select_list(C) from_clause_opt(D)
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
SELECT tag_mode_opt(M) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
A = createSelectStmt(pCxt, B, C, D);
A = setSelectStmtTagMode(pCxt, A, M);
A = addWhereClause(pCxt, A, E);
A = addPartitionByClause(pCxt, A, F);
A = addWindowClauseClause(pCxt, A, G);
......@@ -1023,6 +1024,11 @@ query_specification(A) ::=
A = addFillClause(pCxt, A, L);
}
%type tag_mode_opt { bool }
%destructor tag_mode_opt { }
tag_mode_opt(A) ::= . { A = false; }
tag_mode_opt(A) ::= TAGS. { A = true; }
%type set_quantifier_opt { bool }
%destructor set_quantifier_opt { }
set_quantifier_opt(A) ::= . { A = false; }
......
......@@ -852,6 +852,13 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
return select;
}
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags) {
if (pStmt && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->tagScan = bSelectTags;
}
return pStmt;
}
static void setSubquery(SNode* pStmt) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->isSubquery = true;
......@@ -1333,17 +1340,15 @@ SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable,
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName,
SDataType dataType) {
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode) {
CHECK_PARSER_STATUS(pCxt);
if (!checkColumnName(pCxt, pColName)) {
return NULL;
}
SColumnDefNode* pCol = (SColumnDefNode*)pColDefNode;
SAlterTableStmt* pStmt = (SAlterTableStmt*)nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->alterType = alterType;
COPY_STRING_FORM_ID_TOKEN(pStmt->colName, pColName);
pStmt->dataType = dataType;
strcpy(pStmt->colName, pCol->colName);
strcpy(pStmt->colComment, pCol->comments);
pStmt->dataType = pCol->dataType;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
......
......@@ -4697,6 +4697,7 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)};
strcpy(field.name, pCol->colName);
strcpy(field.comment, pCol->comments);
if (pCol->sma) {
field.flags |= COL_SMA_ON;
}
......@@ -5044,6 +5045,7 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem
pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->flags = flags;
strcpy(pSchema->name, pCol->colName);
strcpy(pSchema->comment, pCol->comments);
}
typedef struct SSampleAstInfo {
......@@ -7692,6 +7694,10 @@ static int32_t extractDescribeResultSchema(int32_t* numOfCols, SSchema** pSchema
(*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN;
strcpy((*pSchema)[3].name, "note");
(*pSchema)[4].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[4].bytes = DESCRIBE_RESULT_COL_COMMENT_LEN;
strcpy((*pSchema)[4].name, "comment");
return TSDB_CODE_SUCCESS;
}
......@@ -8067,8 +8073,6 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pShow->pDnodeId) {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", pShow->pDnodeId, &pStmt->pWhere);
} else {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_ep", pShow->pDnodeEndpoint, &pStmt->pWhere);
}
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -8874,6 +8878,15 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
pReq->type = pStmt->dataType.type;
pReq->flags = COL_SMA_ON;
pReq->bytes = calcTypeBytes(pStmt->dataType);
if (pStmt->colComment[0]) {
pReq->colComment = taosStrdup(pStmt->colComment);
if (pReq->colComment == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colCommentLen = strlen(pReq->colComment);
} else {
pReq->colCommentLen = -1;
}
return TSDB_CODE_SUCCESS;
}
......@@ -8924,6 +8937,15 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colId = pSchema->colId;
if (pStmt->colComment[0]) {
pReq->colComment = taosStrdup(pStmt->colComment);
if (pReq->colComment == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colCommentLen = strlen(pReq->colComment);
} else {
pReq->colCommentLen = -1;
}
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -239,9 +239,9 @@ TEST_F(ParserShowToUseTest, showVgroups) {
TEST_F(ParserShowToUseTest, showVnodes) {
useDb("root", "test");
run("SHOW VNODES 1");
run("SHOW VNODES ON DNODE 1");
run("SHOW VNODES 'node1:7030'");
run("SHOW VNODES");
}
TEST_F(ParserShowToUseTest, splitVgroup) {
......
......@@ -14,7 +14,7 @@
*/
#include "planInt.h"
#include "filter.h"
#include "functionMgt.h"
typedef struct SLogicPlanContext {
......@@ -253,7 +253,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return SCAN_TYPE_SYSTEM_TABLE;
}
if (tagScan) {
if (tagScan && 0 == LIST_LENGTH(pScanCols) && 0 != LIST_LENGTH(pScanPseudoCols)) {
return SCAN_TYPE_TAG;
}
......@@ -344,6 +344,55 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; }
static EDealRes tagScanNodeHasTbnameFunc(SNode* pNode, void* pContext) {
if (QUERY_NODE_FUNCTION == nodeType(pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pNode)->funcType ||
(QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static bool tagScanNodeListHasTbname(SNodeList* pCols) {
bool hasTbname = false;
nodesWalkExprs(pCols, tagScanNodeHasTbnameFunc, &hasTbname);
return hasTbname;
}
static bool tagScanNodeHasTbname(SNode* pKeys) {
bool hasTbname = false;
nodesWalkExpr(pKeys, tagScanNodeHasTbnameFunc, &hasTbname);
return hasTbname;
}
static int32_t tagScanSetExecutionMode(SScanLogicNode* pScan) {
pScan->onlyMetaCtbIdx = false;
if (tagScanNodeListHasTbname(pScan->pScanPseudoCols)) {
pScan->onlyMetaCtbIdx = false;
return TSDB_CODE_SUCCESS;
}
if (pScan->node.pConditions == NULL) {
pScan->onlyMetaCtbIdx = true;
return TSDB_CODE_SUCCESS;
}
SNode* pCond = nodesCloneNode(pScan->node.pConditions);
SNode* pTagCond = NULL;
SNode* pTagIndexCond = NULL;
filterPartitionCond(&pCond, NULL, &pTagIndexCond, &pTagCond, NULL);
if (pTagIndexCond || tagScanNodeHasTbname(pTagCond)) {
pScan->onlyMetaCtbIdx = false;
} else {
pScan->onlyMetaCtbIdx = true;
}
nodesDestroyNode(pCond);
nodesDestroyNode(pTagIndexCond);
nodesDestroyNode(pTagCond);
return TSDB_CODE_SUCCESS;
}
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable,
SLogicNode** pLogicNode) {
SScanLogicNode* pScan = NULL;
......@@ -411,6 +460,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
if (pScan->scanType == SCAN_TYPE_TAG) {
code = tagScanSetExecutionMode(pScan);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pScan;
} else {
......
......@@ -1563,7 +1563,8 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
bool ret = 1 == LIST_LENGTH(pNode->pChildren) &&
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0));
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) &&
SCAN_TYPE_TAG != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->scanType;
if (!ret) return ret;
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_PARTITION: {
......
......@@ -362,7 +362,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
msgLen, ver, total, size + msgLen/1048576.0);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size);
......
......@@ -191,6 +191,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
return 0;
}
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
......@@ -444,11 +450,11 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
qDebug("try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
qDebug("break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
......
......@@ -130,11 +130,11 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char*
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
tryCount++;
taosMsleep(1);
qDebug("===stream===try again batchSize:%d", numOfBlocks);
qDebug("try again batchSize:%d", numOfBlocks);
continue;
}
qDebug("===stream===break batchSize:%d", numOfBlocks);
qDebug("break batchSize:%d", numOfBlocks);
break;
}
......
......@@ -524,6 +524,7 @@ SSyncState syncGetState(int64_t rid) {
if (pSyncNode != NULL) {
state.state = pSyncNode->state;
state.roleTimeMs = pSyncNode->roleTimeMs;
state.startTimeMs = pSyncNode->startTime;
state.restored = pSyncNode->restoreFinish;
if (pSyncNode->vgId != 1) {
state.canRead = syncNodeIsReadyForRead(pSyncNode);
......
......@@ -107,35 +107,39 @@ if $data30 != 12 then
return -1
endi
print =============== show vnodes
sql show vnodes 1
print =============== show vnodes on dnode 1
sql show vnodes on dnode 1
if $rows != 9 then
return -1
endi
if $data(4)[1] != 1 then
return -1
if $data10 != 1 then
return -1
endi
if $data(4)[2] != leader then
return -1
if $data11 != 5 then
return -1
endi
if $data(4)[3] != d2 then
return -1
if $data12 != d2 then
return -1
endi
if $data(4)[4] != 1 then
return -1
if $data13 != leader then
return -1
endi
if $data(4)[5] != localhost:7100 then
return -1
print $data14
print $data15
if $data16 != 1 then
return -1
endi
sql show vnodes 'localhost:7100'
print ================ show vnodes
sql show vnodes
if $rows != 9 then
return -1
return -1
endi
print =============== drop database
......
此差异已折叠。
......@@ -578,18 +578,40 @@ class TMQCom:
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
def killProcesser(self, processerName):
killCmd = (
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1"
% processerName
)
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % processerName
processID = subprocess.check_output(psCmd, shell=True)
if platform.system().lower() == 'windows':
killCmd = ("wmic process where name=\"%s.exe\" call terminate > NUL 2>&1" % processerName)
psCmd = ("wmic process where name=\"%s.exe\" | findstr \"%s.exe\"" % (processerName, processerName))
else:
killCmd = (
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1"
% processerName
)
psCmd = ("ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % processerName)
processID = ""
try:
processID = subprocess.check_output(psCmd, shell=True)
except Exception as err:
processID = ""
print('**** warn: ', err)
while processID:
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
try:
processID = subprocess.check_output(psCmd, shell=True)
except Exception as err:
processID = ""
print('**** warn: ', err)
def startProcess(self, processName, param):
if platform.system().lower() == 'windows':
cmd = f"mintty -h never %s %s > NUL 2>&1" % (processName, param)
else:
cmd = f"nohup %s %s > /dev/null 2>&1 &" % (processName, param)
tdLog.info("%s"%(cmd))
os.system(cmd)
def close(self):
self.cursor.close()
......
......@@ -176,9 +176,7 @@ class TDTestCase:
# use taosBenchmark to subscribe
binPath = self.getPath()
cmd = "nohup %s -f ./7-tmq/tmqDropConsumer.json > /dev/null 2>&1 & " % binPath
tdLog.info("%s"%(cmd))
os.system(cmd)
tmqCom.startProcess(binPath, "-f ./7-tmq/tmqDropConsumer.json")
expectTopicNum = len(topicNameList)
consumerThreadNum = 2
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册