diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a1b6174de0d21f02fdde1e367bcb45901707c6ba..748a9b299626c285e5089af848bed9e0ad26e0a8 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -382,6 +382,7 @@ typedef struct SSqlObj { typedef struct SSqlStream { SSqlObj *pSql; + const char* dstTable; uint32_t streamId; char listed; bool isProject; @@ -408,6 +409,8 @@ typedef struct SSqlStream { struct SSqlStream *prev, *next; } SSqlStream; +void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); + int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn); void tscInitMsgsFp(); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 18fc79c4748f84d4ec073e4bc678e3a067afb025..f813ff85d99e6642827a49defe6b96f29720fc57 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { SSqlStream *pStream = pObj->streamList; while (pStream) { tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql)); + if (pStream->dstTable == NULL) { + pSdesc->dstTable[0] = 0; + } else { + tstrncpy(pSdesc->dstTable, pStream->dstTable, sizeof(pSdesc->dstTable)); + } pSdesc->streamId = htonl(pStream->streamId); pSdesc->num = htobe64(pStream->num); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 68c3bcae165050863cc4bf9c92a1510581531c3a..74b8e4d95878d8a784d4d82e28028e94bd32207b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr); } +void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { + pStream->dstTable = dstTable; +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index efb8795962e769dd0b11b8d313177855c994f52d..3968d5b8c9dfbb53ad04179e0451fc9aab76980e 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -57,6 +57,7 @@ typedef struct SCqObj { uint64_t uid; int32_t tid; // table ID int32_t rowSize; // bytes of a row + char * dstTable; char * sqlStr; // SQL string STSchema * pSchema; // pointer to schema array void * pStream; @@ -185,7 +186,7 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) { +void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) { if (tsEnableStream == 0) { return NULL; } @@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema * if (pObj == NULL) return NULL; pObj->uid = uid; - pObj->tid = tid; - pObj->sqlStr = malloc(strlen(sqlStr)+1); - strcpy(pObj->sqlStr, sqlStr); + pObj->tid = sid; + if (dstTable != NULL) { + pObj->dstTable = strdup(dstTable); + } + pObj->sqlStr = strdup(sqlStr); pObj->pSchema = tdDupSchema(pSchema); pObj->rowSize = schemaTLen(pSchema); @@ -247,6 +250,7 @@ void cqDrop(void *handle) { cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); tdFreeSchema(pObj->pSchema); + free(pObj->dstTable); free(pObj->sqlStr); free(pObj); @@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { if (pObj->pStream == NULL) { pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); if (pObj->pStream) { + tscSetStreamDestTable(pObj->pStream, pObj->dstTable); pContext->num++; cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); } else { diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index 41380f0d86ed31b2466596993b8ce1a1d435f8cf..f378835f0a87f9dd3539d92ac052c81cabcc165d 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { tdDestroyTSchemaBuilder(&schemaBuilder); for (int sid =1; sid<10; ++sid) { - cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); + cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); } tdFreeSchema(pSchema); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e8e302924490eb0ebcccf10468c3469107fe6dfe..6404f12034cb42964536f08778287f5341c90335 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -790,6 +790,7 @@ typedef struct { typedef struct { char sql[TSDB_SHOW_SQL_LEN]; + char dstTable[TSDB_TABLE_NAME_LEN]; uint32_t streamId; int64_t num; // number of computing/cycles int64_t useconds; diff --git a/src/inc/tcq.h b/src/inc/tcq.h index afa744a9c4856521075fb9427d0ed1bb412aa7d5..ad123d4080e23d2c7933dcea2981fd2376b99afe 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -42,7 +42,7 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema); +void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 58859f42bc80daa3317d789950c1625c1533cf5f..42100438fd016a406d4cd4e9690092a15cb7284f 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -48,7 +48,7 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status, int eno); int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); + void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema); void (*cqDropFunc)(void *handle); } STsdbAppH; diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 36b6ff7a59fc1f1c73e64c40ff3a5a1cef65ece6..3256d5cd59699139cf67cebcc41070ca99cffad6 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "dest table"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); @@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]); + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 3990c0c516d1768135934b098efbcd7746ab424c..9d65325001a33dedbf04cf7d065d1d803eeb4e1c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { for (int i = 0; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, tsdbGetTableSchemaImpl(pTable, false, false, -1)); } } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 25c815b74e3bd5593bef9157cbd42ba869298cab..9dfa147c8f35c20cea9b85e38e38a3db60f3572b 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, tsdbGetTableSchemaImpl(pTable, false, false, -1)); }