提交 60982694 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj

......@@ -30,7 +30,7 @@
fprintf(stderr, "\033[0m"); } while(0)
int64_t g_num_of_tb = 2;
int64_t g_num_of_rec = 2;
int64_t g_num_of_rec = 3;
static struct argp_option options[] = {
{"tables", 't', "NUMBER", 0, "Number of child tables, default is 10000."},
......@@ -42,10 +42,18 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
switch (key) {
case 't':
g_num_of_tb = atoll(arg);
if (g_num_of_tb < 1) {
warnPrint("minimal g_num_of_tb is %d\n", 1);
g_num_of_tb = 1;
}
break;
case 'n':
g_num_of_rec = atoll(arg);
if (g_num_of_rec < 2) {
warnPrint("minimal g_num_of_rec is %d\n", 2);
g_num_of_rec = 2;
}
break;
}
......@@ -65,15 +73,32 @@ static void prepare_data(TAOS* taos) {
usleep(100000);
taos_select_db(taos, "test");
res = taos_query(taos, "create table meters(ts timestamp, f float, n int, b binary(20), c nchar(20)) tags(area int, city binary(20), dist nchar(20));");
char command[1024] = {0};
sprintf(command, "%s", "create table meters(ts timestamp, f float, n int, bin1 binary(20), c nchar(20), bin2 binary(20)) tags(area int, city binary(20), dist nchar(20), street binary(20));");
res = taos_query(taos, command);
if ((res) && (0 == taos_errno(res))) {
okPrint("%s created\n", "meters");
} else {
errorPrint("%s() LN%d: %s\n",
__func__, __LINE__, taos_errstr(res));
taos_free_result(res);
exit(1);
}
taos_free_result(res);
char command[1024] = {0};
for (int64_t i = 0; i < g_num_of_tb; i ++) {
// sprintf(command, "create table t%"PRId64" using meters tags(%"PRId64", '%s', '%s');",
// i, i, (i%2)?"beijing":"shanghai", (i%2)?"朝阳区":"黄浦区");
sprintf(command, "create table t%"PRId64" using meters tags(%"PRId64", '%s', '%s');",
i, i, (i%2)?"beijing":"shanghai", (i%2)?"chaoyang":"huangpu");
sprintf(command, "create table t%"PRId64" using meters "
"tags(%"PRId64", '%s', '%s', '%s');",
i, i, (i%2)?"beijing":"shanghai",
(i%2)?"朝阳区":"黄浦区",
(i%2)?"长安街":"中山路");
/* sprintf(command, "create table t%"PRId64" using meters "
"tags(%"PRId64", '%s', '%s', '%s');",
i, i,
(i%2)?"beijing":"shanghai",
(i%2)?"chaoyang":"huangpu",
(i%2?"changan street":"jianguo rd"));
*/
res = taos_query(taos, command);
if ((res) && (0 == taos_errno(res))) {
okPrint("t%" PRId64 " created\n", i);
......@@ -86,11 +111,15 @@ static void prepare_data(TAOS* taos) {
int64_t j = 0;
int64_t total = 0;
int64_t affected;
for (; j < g_num_of_rec -1; j ++) {
for (; j < g_num_of_rec -2; j ++) {
sprintf(command, "insert into t%"PRId64" "
"values(%" PRId64 ", %f, %"PRId64", '%c%d', '%c%d')",
i, 1650000000000+j, (float)j, j, 'a'+(int)j%25, rand(),
'z' - (int)j%25, rand());
"values(%" PRId64 ", %f, %"PRId64", "
"'%c%d', '%s%c%d', '%c%d')",
i, 1650000000000+j, (float)j, j,
'a'+(int)j%25, rand(),
"涛思", 'z' - (int)j%25, rand(),
'b' - (int)j%25, rand()
);
res = taos_query(taos, command);
if ((res) && (0 == taos_errno(res))) {
affected = taos_affected_rows(res);
......@@ -101,8 +130,25 @@ static void prepare_data(TAOS* taos) {
}
taos_free_result(res);
}
sprintf(command, "insert into t%"PRId64" values(%" PRId64 ", NULL, NULL, NULL, NULL)",
i, 1650000000000+j+1);
sprintf(command, "insert into t%"PRId64" values(%" PRId64 ", "
"NULL, NULL, NULL, NULL, NULL)",
i, 1650000000000+j);
res = taos_query(taos, command);
if ((res) && (0 == taos_errno(res))) {
affected = taos_affected_rows(res);
total += affected;
} else {
errorPrint("%s() LN%d: %s\n",
__func__, __LINE__, taos_errstr(res));
}
sprintf(command, "insert into t%"PRId64" "
"values(%" PRId64 ", %f, %"PRId64", "
"'%c%d', '%s%c%d', '%c%d')",
i, 1650000000000+j+1, (float)j, j,
'a'+(int)j%25, rand(),
"数据", 'z' - (int)j%25, rand(),
'b' - (int)j%25, rand()
);
res = taos_query(taos, command);
if ((res) && (0 == taos_errno(res))) {
affected = taos_affected_rows(res);
......@@ -113,7 +159,8 @@ static void prepare_data(TAOS* taos) {
}
taos_free_result(res);
printf("insert %"PRId64" records into t%"PRId64", total affected rows: %"PRId64"\n", j, i, total);
okPrint("insert %"PRId64" records into t%"PRId64", "
"total affected rows: %"PRId64"\n", j, i, total);
}
}
......@@ -127,29 +174,63 @@ static int print_result(char *tbname, TAOS_RES* res, int block) {
printf("fields[%d].name=%s, fields[%d].type=%d, fields[%d].bytes=%d\n",
f, fields[f].name, f, fields[f].type, f, fields[f].bytes);
}
if (block) {
warnPrint("%s() LN%d, call taos_fetch_block()\n", __func__, __LINE__);
warnPrint("%s", "call taos_fetch_block()\n");
int rows = 0;
while ((rows = taos_fetch_block(res, &row))) {
for (int f = 0; f < num_fields; f++) {
if ((fields[f].type != TSDB_DATA_TYPE_VARCHAR)
&& (fields[f].type != TSDB_DATA_TYPE_NCHAR)
&& (fields[f].type != TSDB_DATA_TYPE_JSON)) {
printf("col%d type is %d, no need get offset\n",
f, fields[f].type);
continue;
}
int *offsets = taos_get_column_data_offset(res, f);
if (offsets) {
for (int c = 0; c < rows; c++) {
if (offsets[c] != -1) {
int length = *(int16_t*)(row[f] + offsets[c]);
char *buf = calloc(1, length + 1);
strncpy(buf, (char *)(row[f] + offsets[c] + 2), length);
printf("row: %d, col: %d, offset: %d, length: %d, content: %s\n",
c, f, offsets[c], length, buf);
free(buf);
} else {
printf("row: %d, col: %d, offset: -1, means content is NULL\n",
c, f);
}
}
} else {
errorPrint("%s() LN%d: col%d's lengths is NULL\n",
__func__, __LINE__, f);
}
}
num_rows += rows;
}
} else {
warnPrint("%s() LN%d, call taos_fetch_rows()\n", __func__, __LINE__);
warnPrint("%s", "call taos_fetch_rows()\n");
while ((row = taos_fetch_row(res))) {
char temp[256] = {0};
taos_print_row(temp, row, fields, num_fields);
puts(temp);
num_rows ++;
int* lengths = taos_fetch_lengths(res);
if (lengths) {
for (int c = 0; c < num_fields; c++) {
printf("length of column %d is %d\n", c, lengths[c]);
printf("row: %"PRId64", col: %d, is_null: %s, length of column %d is %d\n",
num_rows, c,
taos_is_null(res, num_rows, c)?"True":"False",
c, lengths[c]);
}
} else {
errorPrint("%s() LN%d: %s's lengths is NULL\n",
__func__, __LINE__, tbname);
}
num_rows ++;
}
}
......@@ -172,8 +253,8 @@ static void verify_query(TAOS* taos) {
int field_count = taos_field_count(res);
printf("field_count: %d\n", field_count);
int64_t rows = print_result(tbname, res, i % 2);
printf("rows is: %"PRId64"\n", rows);
okPrint("total query %s result rows is: %"PRId64"\n",
tbname, rows);
} else {
errorPrint("%s() LN%d: %s\n",
__func__, __LINE__, taos_errstr(res));
......@@ -207,7 +288,7 @@ int main(int argc, char *argv[]) {
verify_query(taos);
taos_close(taos);
printf("done\n");
okPrint("%s", "done\n");
return 0;
}
......
......@@ -1209,12 +1209,17 @@ typedef struct {
int32_t code;
} STaskDropRsp;
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char outputSTbName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
char* sql;
char* ast;
int8_t triggerType;
int64_t watermark;
} SCMCreateStreamReq;
typedef struct {
......
......@@ -162,38 +162,39 @@ typedef struct {
int32_t extendedRowSize;
} SRowBuilder;
#define TD_ROW_HEAD_LEN (sizeof(STSRow))
#define TD_ROW_HEAD_LEN (sizeof(STSRow))
#define TD_ROW_NCOLS_LEN (sizeof(col_id_t))
#define TD_ROW_INFO(r) ((r)->info)
#define TD_ROW_TYPE(r) ((r)->type)
#define TD_ROW_DELETE(r) ((r)->del)
#define TD_ROW_ENDIAN(r) ((r)->endian)
#define TD_ROW_SVER(r) ((r)->sver)
#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
#define TD_ROW_INFO(r) ((r)->info)
#define TD_ROW_TYPE(r) ((r)->type)
#define TD_ROW_DELETE(r) ((r)->del)
#define TD_ROW_ENDIAN(r) ((r)->endian)
#define TD_ROW_SVER(r) ((r)->sver)
#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
#define TD_ROW_VER(r) ((r)->ver)
#define TD_ROW_KEY_ADDR(r) (r)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN)
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1)
#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v))
#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l))
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1)
#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v))
#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l))
#define TD_ROW_SET_NCOLS(r, n) (*(col_id_t *)TD_ROW_NCOLS(r) = (n))
#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r) == 1)
#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP)
#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV)
#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP)
#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV)
#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP)
#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV)
#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP)
#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV)
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define TD_ROW_COL_IDX(r) POINTER_SHIFT(TD_ROW_DATA(r), sizeof(col_id_t))
......
......@@ -272,14 +272,9 @@ typedef struct SKillStmt {
int32_t targetId;
} SKillStmt;
typedef enum EStreamTriggerType {
STREAM_TRIGGER_AT_ONCE = 1,
STREAM_TRIGGER_WINDOW_CLOSE
} EStreamTriggerType;
typedef struct SStreamOptions {
ENodeType type;
EStreamTriggerType triggerType;
int8_t triggerType;
SNode* pWatermark;
} SStreamOptions;
......
......@@ -109,6 +109,8 @@ typedef struct SWindowLogicNode {
int64_t sessionGap;
SNode* pTspk;
SNode* pStateExpr;
int8_t triggerType;
int64_t watermark;
} SWindowLogicNode;
typedef struct SSortLogicNode {
......@@ -251,6 +253,8 @@ typedef struct SWinodwPhysiNode {
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
SNode* pTspk; // timestamp primary key
int8_t triggerType;
int64_t watermark;
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
......
......@@ -30,6 +30,8 @@ typedef struct SPlanContext {
bool topicQuery;
bool streamQuery;
bool showRewrite;
int8_t triggerType;
int64_t watermark;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -597,6 +597,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST TAOS_DEF_ERROR_CODE(0, 0x2624)
#define TSDB_CODE_PAR_INVALID_OPTION_UNIT TAOS_DEF_ERROR_CODE(0, 0x2625)
#define TSDB_CODE_PAR_INVALID_KEEP_UNIT TAOS_DEF_ERROR_CODE(0, 0x2626)
#define TSDB_CODE_PAR_AGG_FUNC_NESTING TAOS_DEF_ERROR_CODE(0, 0x2627)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -3381,6 +3381,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
......@@ -3404,6 +3406,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
......
......@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans);
#ifdef __cplusplus
}
......
......@@ -106,6 +106,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, DB_ENCODE_OVER)
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
TASSERT(taosArrayGetSize(pDb->cfg.pRetensions) == pDb->cfg.numOfRetensions);
SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i);
SDB_SET_INT32(pRaw, dataPos, pRetension->freq, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pRetension->keep, DB_ENCODE_OVER)
......
......@@ -429,7 +429,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......
......@@ -218,7 +218,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0;
}
static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
if (NULL == ast) {
return TSDB_CODE_SUCCESS;
}
......@@ -232,6 +232,8 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = triggerType,
.watermark = watermark,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
......@@ -245,7 +247,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return code;
}
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) {
SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) {
......@@ -265,7 +267,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
#endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, triggerType, watermark, &pStream->physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
return -1;
}
......@@ -313,7 +315,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
}
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
......
......@@ -314,8 +314,7 @@ static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
}
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
TSDB_SMA_DNAME[smaType]);
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]);
}
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) {
......
......@@ -1104,6 +1104,8 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
static const char* jkWindowPhysiPlanExprs = "Exprs";
static const char* jkWindowPhysiPlanFuncs = "Funcs";
static const char* jkWindowPhysiPlanTsPk = "TsPk";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
......@@ -1118,6 +1120,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
}
return code;
}
......@@ -1135,6 +1143,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
}
return code;
}
......
......@@ -427,7 +427,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A)
stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, B, C, D); }
stream_options(B) into_opt(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
into_opt(A) ::= . { A = NULL; }
......
......@@ -481,6 +481,14 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
return DEAL_RES_CONTINUE;
}
static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) {
*((bool*)pContext) = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
......@@ -492,6 +500,11 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
if (fmIsAggFunc(pFunc->funcId) && beforeHaving(pCxt->currClause)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION);
}
bool haveAggFunc = false;
nodesWalkExprs(pFunc->pParameterList, haveAggFunction, &haveAggFunc);
if (haveAggFunc) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
}
return DEAL_RES_CONTINUE;
}
......@@ -2173,6 +2186,14 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pWatermark) {
code = (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark)) ? pCxt->errCode : TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
createReq.triggerType = pStmt->pOptions->triggerType;
createReq.watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
}
......
......@@ -91,6 +91,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Invalid option %s unit: %c, only m, h, d allowed";
case TSDB_CODE_PAR_INVALID_KEEP_UNIT:
return "Invalid option keep unit: %c, %c, %c, only m, h, d allowed";
case TSDB_CODE_PAR_AGG_FUNC_NESTING:
return "Aggregate functions do not support nesting";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......
......@@ -3452,7 +3452,7 @@ static YYACTIONTYPE yy_reduce(
{ yymsp[-1].minor.yy100 = strtol(yymsp[0].minor.yy0.z, NULL, 10); }
break;
case 234: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options into_opt AS query_expression */
{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-3].minor.yy452, yymsp[-2].minor.yy452, yymsp[0].minor.yy452); }
{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy659, &yymsp[-4].minor.yy479, yymsp[-2].minor.yy452, yymsp[-3].minor.yy452, yymsp[0].minor.yy452); }
break;
case 235: /* cmd ::= DROP STREAM exists_opt stream_name */
{ pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy659, &yymsp[0].minor.yy479); }
......
......@@ -463,6 +463,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) {
int32_t code = nodesCollectFuncs(pSelect, fmIsWindowClauseFunc, &pWindow->pFuncs);
if (pCxt->pPlanCxt->streamQuery) {
pWindow->triggerType = pCxt->pPlanCxt->triggerType;
pWindow->watermark = pCxt->pPlanCxt->watermark;
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
}
......
......@@ -56,6 +56,20 @@ typedef enum ECondAction {
// after supporting outer join, there are other possibilities
} ECondAction;
EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static bool haveNormalCol(SNodeList* pList) {
bool res = false;
nodesWalkExprsPostOrder(pList, haveNormalColImpl, &res);
return res;
}
static bool osdMayBeOptimized(SLogicNode* pNode) {
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) {
return false;
......@@ -67,7 +81,10 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
return false;
}
return true;
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
}
return !haveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
static SLogicNode* osdFindPossibleScanNode(SLogicNode* pNode) {
......
......@@ -796,6 +796,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
}
}
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pWindow;
} else {
......
......@@ -45,7 +45,7 @@ protected:
int32_t code = qParseQuerySql(&cxt_, &query_);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] parser code:" << code << ", strerror:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
cout << "sql:[" << cxt_.pSql << "] qParseQuerySql code:" << code << ", strerror:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return false;
}
......@@ -123,6 +123,12 @@ private:
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
nodesStringToNode(req.ast, &pCxt->pAstRoot);
pCxt->streamQuery = true;
} else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) {
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot;
pCxt->pAstRoot = pStmt->pQuery;
pCxt->streamQuery = true;
pCxt->triggerType = pStmt->pOptions->triggerType;
pCxt->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
} else {
pCxt->pAstRoot = pQuery->pRoot;
}
......@@ -353,11 +359,11 @@ TEST_F(PlannerTest, createTopic) {
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, stream) {
TEST_F(PlannerTest, createStream) {
setDatabase("root", "test");
bind("SELECT sum(c1) FROM st1");
ASSERT_TRUE(run(true));
bind("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 interval(10s)");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, createSmaIndex) {
......
......@@ -2735,10 +2735,12 @@ void schedulerDestroy(void) {
if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
int64_t refId = 0;
while (pJob) {
refId = pJob->refId;
if (refId == 0) {
break;
}
taosRemoveRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, refId);
......
......@@ -58,4 +58,4 @@ python3 ./test.py $1 -f client/client.py
python3 ./test.py $1 -s && sleep 1
# connector
python3 ./test.py $1 -f connector/lua.py
# python3 ./test.py $1 -f connector/lua.py
......@@ -12,7 +12,8 @@ fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
pkill -9 taosd
#pkill -9 taosd
kill -9 $PID
echo "Killing processes locking on port 6030"
if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6030
......
......@@ -350,4 +350,6 @@ sql_error alter database db precision 'ns'
sql_error alter database db precision 'ys'
sql_error alter database db prec 'xs'
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
......@@ -484,4 +484,6 @@ sql drop database db
sql_error create database db STREAM_MODE 2
sql_error create database db STREAM_MODE -1
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
......@@ -353,4 +353,4 @@ while $dbCnt < 2
endw
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -471,4 +471,4 @@ endi
# return -1
#endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -138,4 +138,4 @@ if $loop_test == 0 then
goto loop_test_pos
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -727,4 +727,4 @@ if $loop_test == 0 then
goto loop_test_pos
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -527,4 +527,4 @@ if $rows != 1 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -577,4 +577,4 @@ if $data00 != 33 then
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -688,4 +688,4 @@ if $rows != 1 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -318,4 +318,4 @@ endi
#sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d)
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -180,4 +180,4 @@ print =============== clear
# return -1
#endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -481,4 +481,4 @@ if $loop_test == 0 then
goto loop_test_pos
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -347,4 +347,4 @@ if $loop_test == 0 then
goto loop_test_pos
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -212,4 +212,5 @@ if $rows != 3 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
......@@ -211,3 +211,6 @@ if $rows != 2 then
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
......@@ -76,4 +76,4 @@ if $data00 != 10000 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -196,4 +196,4 @@ $dbNamme = d1
sql create database $dbNamme vgroups 4
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -43,6 +43,35 @@ loop_vgroups:
print =============== create database $dbNamme vgroups $vgroups
sql create database $dbNamme vgroups $vgroups
sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
if $loop_cnt == 0 then
if $rows != 2 then
return -1
endi
if $data02 != 2 then # vgroups
print vgroups: $data02
return -1
endi
else
if $rows != 3 then
return -1
endi
if $data00 == d1 then
if $data02 != 4 then # vgroups
print vgroups: $data02
return -1
endi
else
if $data12 != 4 then # vgroups
print vgroups: $data12
return -1
endi
endi
endi
sql use $dbNamme
print =============== create super table
......@@ -233,4 +262,4 @@ if $loop_cnt == 0 then
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -204,27 +204,27 @@ print expectMsgCntFromStb: $expectMsgCntFromStb
#endi
$expect_result = @{consume success: @
$expect_result = $expect_result . $expectConsumeMsgCnt
$expect_result = $expect_result . $expectMsgCntFromStb
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0
print cmd result----> $system_content
if $system_content != success then
return -1
endi
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0
#print cmd result----> $system_content
##if $system_content != @{consume success: 10000, 0}@ then
#if $system_content != success then
# return -1
#endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 0
print cmd result----> $system_content
#if $system_content != @{consume success: 10000, 0}@ then
if $system_content != success then
......@@ -237,4 +237,4 @@ if $loop_cnt == 0 then
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -232,4 +232,4 @@ if $loop_cnt == 0 then
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -237,4 +237,4 @@ if $loop_cnt == 0 then
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -210,4 +210,4 @@ if $loop_cnt == 0 then
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -239,4 +239,4 @@ if $loop_cnt == 0 then
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -221,4 +221,4 @@ if $loop_cnt == 0 then
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -261,4 +261,4 @@ if $loop_cnt == 0 then
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -39,6 +39,7 @@ sql connect
$loop_cnt = 0
$vgroups = 1
$dbNamme = d0
loop_vgroups:
print =============== create database $dbNamme vgroups $vgroups
sql create database $dbNamme vgroups $vgroups
......@@ -48,15 +49,15 @@ print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $d
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
if $loop_cnt == 0 then
if $rows != 2 then
if $rows != 3 then
return -1
endi
if $data02 != 1 then # vgroups
if $data22 != 1 then # vgroups
print vgroups: $data02
return -1
endi
else
if $rows != 3 then
if $rows != 4 then
return -1
endi
if $data00 == d1 then
......@@ -153,7 +154,7 @@ endw
# -g showMsgFlag, default is 0
#
$consumeDelay = 2
$consumeDelay = 3
$expectMsgCntFromCtb = $rowNum
$expectMsgCntFromStb = $rowNum * $tbNum
......@@ -179,8 +180,18 @@ $expect_result = $expect_result . $totalMsgCntOfmultiTopics
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 2
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 2
$check_mode = 0
if $loop_cnt == 0 then
$check_mode = 0
else
$check_mode = 2
endi
$expectMsgCntFromStb0 = 2001
$expectMsgCntFromStb1 = 2001
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb0 -m1 $expectMsgCntFromStb1 -j 2
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb0 -m1 $expectMsgCntFromStb1 -j 2
print cmd result----> $system_content
if $system_content != success then
return -1
......@@ -237,4 +248,4 @@ if $loop_cnt == 0 then
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册