diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index fb0c1508cb051c577fed9bde534d55e297822fe2..f620eb4dd55754146ddd164d372bb1233cf94aba 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -294,7 +294,7 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) { +void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) { if (tsEnableStream == 0) { return NULL; } @@ -326,7 +326,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch pObj->rid = taosAddRef(cqObjRef, pObj); - cqCreateStream(pContext, pObj); + if(start && pContext->master) { + cqCreateStream(pContext, pObj); + } else { + pObj->pContext = pContext; + } rid = pObj->rid; diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index f378835f0a87f9dd3539d92ac052c81cabcc165d..b1153397bac9cf9b2bb1f209b2765e2fe25f8244 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { tdDestroyTSchemaBuilder(&schemaBuilder); for (int sid =1; sid<10; ++sid) { - cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); + cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema, 1); } tdFreeSchema(pSchema); diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 1941649d0a02e7c75f4781cad5dedfa185127752..552a40665ae10c72203ac60afaec59a4381ead5f 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -42,7 +42,7 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema); +void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 495bfa2384ae0680577775c83f63d098ebf4585f..85ee9f0443bcab328cd086f32723f0599bd4c353 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -51,7 +51,7 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status, int eno); int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema); + void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start); void (*cqDropFunc)(void *handle); } STsdbAppH; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 8969f61596b33f37cabe934fdd56819b71575315..99929f3542160cc53b99571f73d699e1abcbf171 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -526,7 +526,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, - tsdbGetTableSchemaImpl(pTable, false, false, -1)); + tsdbGetTableSchemaImpl(pTable, false, false, -1), 0); } } } @@ -619,4 +619,4 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbDestroyReadH(&readh); return 0; -} \ No newline at end of file +} diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 824f69608e8c9d2d00e3219d54b484304827e8ae..3e6263b9d323f367ae0c10522dac1240eaf4d70f 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -840,7 +840,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, - tsdbGetTableSchemaImpl(pTable, false, false, -1)); + tsdbGetTableSchemaImpl(pTable, false, false, -1), 1); } tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), @@ -1322,4 +1322,4 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) { } return 0; -} \ No newline at end of file +}