提交 661fbd2e 编写于 作者: L Liu Jicong

enh(stream): improve topic show

上级 68b9424f
......@@ -83,7 +83,7 @@ int32_t create_stream() {
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(
pConn,
"create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"from tu1 interval(10m)");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
......
......@@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
#if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
......@@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
......@@ -268,12 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
#endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
#endif
/* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
#endif
......
......@@ -86,10 +86,9 @@ extern const int32_t TYPE_BYTES[15];
#define TS_PATH_DELIMITER "."
#define TS_ESCAPE_CHAR '`'
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_HOURS 3
#define TSDB_TIME_PRECISION_MINUTES 4
#define TSDB_TIME_PRECISION_SECONDS 5
......@@ -249,7 +248,6 @@ typedef enum ELogicConditionType {
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SHOW_LIST_LEN 1000
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
......@@ -376,9 +374,9 @@ typedef enum ELogicConditionType {
* 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag
*/
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
......
......@@ -395,7 +395,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
}
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
tmq_list_append(*topics, topic->topicName);
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
}
return TMQ_RESP_ERR__SUCCESS;
}
......
......@@ -14,9 +14,9 @@
*/
#include "systable.h"
#include "taos.h"
#include "tdef.h"
#include "types.h"
#include "taos.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
......@@ -264,7 +264,7 @@ static const SSysDbTableSchema consumerSchema[] = {
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
......
......@@ -33,6 +33,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
#ifdef __cplusplus
}
#endif
......
......@@ -790,71 +790,83 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
if (pShow->pIter == NULL) break;
SColumnInfoData *pColInfo;
int32_t cols = 0;
taosRLockLatch(&pConsumer->lock);
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
// group id
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
varDataSetLen(groupId, strlen(varDataVal(groupId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
// app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
varDataSetLen(status, strlen(varDataVal(status)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
// subscribed topics
// TODO: split into multiple rows
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
taosMemoryFree(showStr);
varDataSetLen(topics, strlen(varDataVal(topics)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topics, false);
// pid
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
// end point
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
// up time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true;
if (topicSz == 0) {
hasTopic = false;
topicSz = 1;
}
for (int32_t i = 0; i < topicSz; i++) {
if (numOfRows + topicSz > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
}
SColumnInfoData *pColInfo;
int32_t cols = 0;
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
// group id
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
varDataSetLen(groupId, strlen(varDataVal(groupId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
// app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
varDataSetLen(status, strlen(varDataVal(status)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
// one subscribed topic
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (hasTopic) {
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN);
varDataSetLen(topic, strlen(varDataVal(topic)));
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
} else {
colDataAppend(pColInfo, numOfRows, NULL, true);
}
// pid
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
// end point
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
// up time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
numOfRows++;
}
taosRUnLockLatch(&pConsumer->lock);
sdbRelease(pSdb, pConsumer);
numOfRows++;
}
pShow->numOfRows += numOfRows;
......
......@@ -259,7 +259,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1;
}
#if 1
#if 0
printf("|");
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name);
......
......@@ -61,6 +61,11 @@ int32_t mndInitTopic(SMnode *pMnode) {
void mndCleanupTopic(SMnode *pMnode) {}
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
//
return strchr(topic, '.') + 1;
}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -55,15 +55,15 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
}
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int code = 0;
int ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile;
// seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
code = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (code < 0) {
ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -72,14 +72,14 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
// TODO:deserialize
ASSERT(entry.ver == ver);
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (code < 0) {
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return code;
return ret;
}
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
......@@ -108,7 +108,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
int code;
SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) {
return 0;
......@@ -126,16 +125,15 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
if (pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer);
if (code < 0) {
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
return -1;
}
}
code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
if (code < 0) {
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
return -1;
}
pRead->curVersion = ver;
return 0;
......@@ -246,8 +244,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
// TODO: check wal life
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) {
if (walReadSeekVer(pRead, ver) < 0) {
return -1;
}
}
......@@ -278,8 +275,12 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen;
}
if (pRead->pHead->head.bodyLen !=
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......
......@@ -33,7 +33,7 @@ if $rows != 3 then
return -1
endi
sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
sql create stream s1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m)
sql show stables
if $rows != 2 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册