提交 7cde651b 编写于 作者: D dapan

Merge remote-tracking branch 'origin/2.0' into feature/qnode

......@@ -2,6 +2,8 @@ build/
compile_commands.json
.cache
.ycm_extra_conf.py
.tasks
.vimspector.json
.vscode/
.idea/
cmake-build-debug/
......
......@@ -28,6 +28,7 @@ endif(${BUILD_TEST})
add_subdirectory(source)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(example)
# docs
add_subdirectory(docs)
......
aux_source_directory(src TMQ_DEMO_SRC)
add_executable(tmq ${TMQ_DEMO_SRC})
target_link_libraries(
tmq
PUBLIC taos
#PUBLIC util
#PUBLIC common
#PUBLIC os
)
target_include_directories(
tmq
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static void msg_process(tmq_message_t* message) {
tmqShowMsg(message);
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu2 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
const char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
taos_close(pConn);
return 0;
}
tmq_t* build_consumer() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
return tmq;
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
return NULL;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
return topic_list;
}
void basic_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
clock_t startTime = clock();
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t *rk,
tmq_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
return;
}
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(rk, NULL, 0);
}
}
err = tmq_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main() {
int code;
code = init_env();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -92,17 +92,6 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_message_topic_t tmq_message_topic_t;
typedef struct tmq_message_tb_t tmq_message_tb_t;
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
typedef struct tmq_message_col_t tmq_message_col_t;
typedef struct tmq_col_iter_t tmq_col_iter_t;
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
......@@ -204,26 +193,66 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
/* --------------------------TMQ INTERFACE------------------------------- */
enum tmq_resp_err_t {
TMQ_RESP_ERR__SUCCESS = 0,
TMQ_RESP_ERR__FAIL = 1,
};
typedef enum tmq_resp_err_t tmq_resp_err_t;
typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
DLL_EXPORT tmq_conf_t* tmq_conf_new();
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message);
DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen);
enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
};
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
DLL_EXPORT int32_t tmq_topic_num(tmq_message_t* msg);
DLL_EXPORT char* tmq_get_topic(tmq_message_topic_t* msg);
DLL_EXPORT int32_t tmq_get_vgId(tmq_message_topic_t* msg);
DLL_EXPORT tmq_message_tb_t* tmq_get_next_tb(tmq_message_topic_t* msg, tmq_tb_iter_t* iter);
DLL_EXPORT tmq_message_col_t* tmq_get_next_col(tmq_message_tb_t* msg, tmq_col_iter_t* iter);
//temporary used function for demo only
void tmqShowMsg(tmq_message_t* tmq_message);
#ifdef __cplusplus
}
......
......@@ -38,6 +38,10 @@
// int16_t bytes;
//} SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
......@@ -115,7 +119,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData data;
SColumnInfoData data = {0};
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI16(buf, &data.info.bytes);
......@@ -130,6 +134,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
int32_t tlen = 0;
int32_t sz = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->committedOffset);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return tlen;
tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas);
if (pRsp->pBlockData) {
sz = taosArrayGetSize(pRsp->pBlockData);
......@@ -145,19 +155,58 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->committedOffset);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return buf;
pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper));
if (pRsp->schemas == NULL) return NULL;
buf = tDecodeSSchemaWrapper(buf, pRsp->schemas);
buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock block;
SSDataBlock block = {0};
tDecodeDataBlock(buf, &block);
taosArrayPush(pRsp->pBlockData, &block);
}
return buf;
}
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
//int32_t numOfOutput = pBlock->info.numOfCols;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
//tfree(pBlock);
}
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
if (pRsp->schemas) {
if (pRsp->schemas->nCols) {
tfree(pRsp->schemas->pSchema);
}
free(pRsp->schemas);
}
taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock);
pRsp->pBlockData = NULL;
//for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
//SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
//tDeleteSSDataBlock(pDataBlock);
//}
}
//======================================================================================================================
// the following structure shared by parser and executor
typedef struct SColumn {
......
......@@ -64,7 +64,6 @@ extern int8_t tsKeepOriginalColumnName;
extern int8_t tsDeadLockKillQuery;
// client
extern int32_t tsMaxSQLStringLen;
extern int32_t tsMaxWildCardsLen;
extern int32_t tsMaxRegexStringLen;
extern int8_t tsTscEnableRecordSql;
......
......@@ -147,6 +147,11 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
typedef struct {
int32_t vgId;
char* dbFName;
......@@ -876,13 +881,21 @@ typedef struct {
char desc[TSDB_STEP_DESC_LEN];
} SStartupReq;
/**
* The layout of the query message payload is as following:
* +--------------------+---------------------------------+
* |Sql statement | Physical plan |
* |(denoted by sqlLen) |(In JSON, denoted by contentLen) |
* +--------------------+---------------------------------+
*/
typedef struct SSubQueryMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
uint32_t contentLen;
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[];
} SSubQueryMsg;
......@@ -1146,10 +1159,7 @@ typedef struct SVCreateTbReq {
char* name;
uint32_t ttl;
uint32_t keep;
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
uint8_t type;
uint8_t type;
union {
struct {
tb_uid_t suid;
......@@ -1194,15 +1204,21 @@ typedef struct {
} SVAlterTbRsp;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int64_t suid;
uint64_t ver;
char* name;
uint8_t type;
tb_uid_t suid;
} SVDropTbReq;
typedef struct {
SMsgHead head;
uint64_t ver;
} SVDropTbRsp;
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp);
void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp);
typedef struct {
SMsgHead head;
int64_t uid;
......@@ -1562,6 +1578,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef struct SMqSetCVgReq {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
......@@ -1578,7 +1595,8 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg*
tlen += taosEncodeFixedU64(buf, pMsg->sId);
tlen += taosEncodeFixedU64(buf, pMsg->queryId);
tlen += taosEncodeFixedU64(buf, pMsg->taskId);
tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
tlen += taosEncodeFixedU32(buf, pMsg->sqlLen);
tlen += taosEncodeFixedU32(buf, pMsg->phyLen);
//tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return tlen;
}
......@@ -1587,13 +1605,15 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
buf = taosDecodeFixedU64(buf, &pMsg->sId);
buf = taosDecodeFixedU64(buf, &pMsg->queryId);
buf = taosDecodeFixedU64(buf, &pMsg->taskId);
buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
buf = taosDecodeFixedU32(buf, &pMsg->sqlLen);
buf = taosDecodeFixedU32(buf, &pMsg->phyLen);
//buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
......@@ -1602,12 +1622,13 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, (char*)pReq->qmsg);
tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
......@@ -1616,7 +1637,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeString(buf, (char**)&pReq->qmsg);
buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
......@@ -1692,6 +1713,10 @@ typedef struct SMqTopicBlk {
typedef struct SMqConsumeRsp {
int64_t consumerId;
SSchemaWrapper* schemas;
int64_t committedOffset;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
SArray* pBlockData; //SArray<SSDataBlock>
} SMqConsumeRsp;
......@@ -1725,10 +1750,15 @@ typedef struct SMqSubTopicEp {
typedef struct SMqCMGetSubEpRsp {
int64_t consumerId;
int64_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
......@@ -1742,6 +1772,10 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
return buf;
}
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp);
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
......@@ -1773,6 +1807,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->epoch);
tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz);
......@@ -1785,6 +1820,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->epoch);
buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -207,36 +207,78 @@
#define TK_INTO 189
#define TK_VALUES 190
#define NEW_TK_UNION 1
#define NEW_TK_ALL 2
#define NEW_TK_MINUS 3
#define NEW_TK_EXCEPT 4
#define NEW_TK_INTERSECT 5
#define NEW_TK_NK_PLUS 6
#define NEW_TK_NK_MINUS 7
#define NEW_TK_NK_STAR 8
#define NEW_TK_NK_SLASH 9
#define NEW_TK_SHOW 10
#define NEW_TK_DATABASES 11
#define NEW_TK_NK_ID 12
#define NEW_TK_NK_LP 13
#define NEW_TK_NK_RP 14
#define NEW_TK_NK_COMMA 15
#define NEW_TK_NK_LITERAL 16
#define NEW_TK_NK_DOT 17
#define NEW_TK_SELECT 18
#define NEW_TK_DISTINCT 19
#define NEW_TK_AS 20
#define NEW_TK_FROM 21
#define NEW_TK_WITH 22
#define NEW_TK_RECURSIVE 23
#define NEW_TK_ORDER 24
#define NEW_TK_BY 25
#define NEW_TK_ASC 26
#define NEW_TK_DESC 27
#define NEW_TK_NULLS 28
#define NEW_TK_FIRST 29
#define NEW_TK_LAST 30
#define NEW_TK_OR 1
#define NEW_TK_AND 2
#define NEW_TK_UNION 3
#define NEW_TK_ALL 4
#define NEW_TK_MINUS 5
#define NEW_TK_EXCEPT 6
#define NEW_TK_INTERSECT 7
#define NEW_TK_NK_PLUS 8
#define NEW_TK_NK_MINUS 9
#define NEW_TK_NK_STAR 10
#define NEW_TK_NK_SLASH 11
#define NEW_TK_NK_REM 12
#define NEW_TK_SHOW 13
#define NEW_TK_DATABASES 14
#define NEW_TK_NK_INTEGER 15
#define NEW_TK_NK_FLOAT 16
#define NEW_TK_NK_STRING 17
#define NEW_TK_NK_BOOL 18
#define NEW_TK_TIMESTAMP 19
#define NEW_TK_NK_VARIABLE 20
#define NEW_TK_NK_COMMA 21
#define NEW_TK_NK_ID 22
#define NEW_TK_NK_LP 23
#define NEW_TK_NK_RP 24
#define NEW_TK_NK_DOT 25
#define NEW_TK_BETWEEN 26
#define NEW_TK_NOT 27
#define NEW_TK_IS 28
#define NEW_TK_NULL 29
#define NEW_TK_NK_LT 30
#define NEW_TK_NK_GT 31
#define NEW_TK_NK_LE 32
#define NEW_TK_NK_GE 33
#define NEW_TK_NK_NE 34
#define NEW_TK_NK_EQ 35
#define NEW_TK_LIKE 36
#define NEW_TK_MATCH 37
#define NEW_TK_NMATCH 38
#define NEW_TK_IN 39
#define NEW_TK_FROM 40
#define NEW_TK_AS 41
#define NEW_TK_JOIN 42
#define NEW_TK_ON 43
#define NEW_TK_INNER 44
#define NEW_TK_SELECT 45
#define NEW_TK_DISTINCT 46
#define NEW_TK_WHERE 47
#define NEW_TK_PARTITION 48
#define NEW_TK_BY 49
#define NEW_TK_SESSION 50
#define NEW_TK_STATE_WINDOW 51
#define NEW_TK_INTERVAL 52
#define NEW_TK_SLIDING 53
#define NEW_TK_FILL 54
#define NEW_TK_VALUE 55
#define NEW_TK_NONE 56
#define NEW_TK_PREV 57
#define NEW_TK_LINEAR 58
#define NEW_TK_NEXT 59
#define NEW_TK_GROUP 60
#define NEW_TK_HAVING 61
#define NEW_TK_ORDER 62
#define NEW_TK_SLIMIT 63
#define NEW_TK_SOFFSET 64
#define NEW_TK_LIMIT 65
#define NEW_TK_OFFSET 66
#define NEW_TK_NK_LR 67
#define NEW_TK_ASC 68
#define NEW_TK_DESC 69
#define NEW_TK_NULLS 70
#define NEW_TK_FIRST 71
#define NEW_TK_LAST 72
#define TK_SPACE 300
#define TK_COMMENT 301
......@@ -247,6 +289,8 @@
#define TK_FILE 306
#define TK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
#define TK_NIL 65535
#endif
......@@ -27,6 +27,10 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
typedef struct SReadHandle {
void* reader;
void* meta;
} SReadHandle;
/**
* Create the exec task for streaming mode
* @param pMsg
......@@ -35,7 +39,13 @@ struct SSubplan;
*/
qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
/**
*
* @param tinfo
* @param input
* @return
*/
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input);
/**
* Create the exec task object according to task json
......@@ -46,7 +56,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
* @param qId
* @return
*/
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
/**
* The main task execution function, including query on both table and multiple tables,
......
......@@ -107,14 +107,14 @@ typedef struct SPoint1 {
union{double val; char* ptr;};
} SPoint1;
struct SQLFunctionCtx;
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SExtTagsInfo {
int16_t tagsLen; // keep the tags data for top/bottom query result
int16_t numOfTagCols;
struct SQLFunctionCtx **pTagCtxList;
struct SqlFunctionCtx **pTagCtxList;
} SExtTagsInfo;
typedef struct SResultDataInfo {
......@@ -126,18 +126,18 @@ typedef struct SResultDataInfo {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
typedef struct SFunctionFpSet {
bool (*init)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(struct SQLFunctionCtx *pCtx);
bool (*init)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(struct SqlFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result.
void (*finalize)(struct SQLFunctionCtx *pCtx);
void (*combine)(struct SQLFunctionCtx *pCtx);
void (*finalize)(struct SqlFunctionCtx *pCtx);
void (*combine)(struct SqlFunctionCtx *pCtx);
} SFunctionFpSet;
extern SFunctionFpSet fpSet[1];
// sql function runtime context
typedef struct SQLFunctionCtx {
typedef struct SqlFunctionCtx {
int32_t size; // number of rows
void * pInput; // input data buffer
uint32_t order; // asc|desc
......@@ -167,7 +167,7 @@ typedef struct SQLFunctionCtx {
int32_t columnIndex;
SFunctionFpSet* fpSet;
} SQLFunctionCtx;
} SqlFunctionCtx;
enum {
TEXPR_NODE_DUMMY = 0x0,
......@@ -216,14 +216,14 @@ typedef struct SAggFunctionInfo {
int8_t sFunctionId; // Transfer function for super table query
uint16_t status;
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(SQLFunctionCtx *pCtx);
bool (*init)(SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(SqlFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result.
void (*finalize)(SQLFunctionCtx *pCtx);
void (*combine)(SQLFunctionCtx *pCtx);
void (*finalize)(SqlFunctionCtx *pCtx);
void (*combine)(SqlFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
struct SScalarFuncParam;
......@@ -279,9 +279,9 @@ void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
tExprNode* exprdup(tExprNode* pTree);
void resetResultRowEntryResult(SQLFunctionCtx* pCtx, int32_t num);
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SQLFunctionCtx* pCtx, int32_t num);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
......
......@@ -22,7 +22,7 @@ extern "C" {
#include "nodes.h"
struct SQLFunctionCtx;
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
......@@ -32,9 +32,9 @@ typedef struct SFuncExecEnv {
typedef void* FuncMgtHandle;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SQLFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SQLFunctionCtx *pCtx);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef struct SFuncExecFuncs {
FExecGetEnv getEnv;
......
......@@ -24,7 +24,6 @@
#endif
OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan)
......@@ -48,3 +47,5 @@ OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange)
//OP_ENUM_MACRO(TableScan)
......@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -80,7 +80,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob);
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob);
/**
* Fetch query result from the remote query executor
......
......@@ -22,6 +22,19 @@ extern "C" {
#include "tdef.h"
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
#define setNodeType(nodeptr, type) (((SNode*)(nodeptr))->type = (type))
#define LIST_LENGTH(l) (NULL != (l) ? (l)->length : 0)
#define FOREACH(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext)
#define FORBOTH(node1, list1, node2, list2) \
for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHead : NULL), *cell2 = (NULL != (list2) ? (list2)->pHead : NULL); \
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
cell1 = cell1->pNext, cell2 = cell2->pNext)
typedef enum ENodeType {
QUERY_NODE_COLUMN = 1,
QUERY_NODE_VALUE,
......@@ -38,6 +51,8 @@ typedef enum ENodeType {
QUERY_NODE_STATE_WINDOW,
QUERY_NODE_SESSION_WINDOW,
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
......@@ -52,9 +67,6 @@ typedef struct SNode {
ENodeType type;
} SNode;
#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type)
#define setNodeType(nodeptr, type) (((SNode*)(nodeptr))->type = (type))
typedef struct SListCell {
SNode* pNode;
struct SListCell* pNext;
......@@ -62,19 +74,10 @@ typedef struct SListCell {
typedef struct SNodeList {
int16_t length;
SListCell* pHeader;
SListCell* pHead;
SListCell* pTail;
} SNodeList;
#define LIST_LENGTH(l) (NULL != (l) ? (l)->length : 0)
#define FOREACH(node, list) \
for (SListCell* cell = (NULL != (list) ? (list)->pHeader : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext)
#define FORBOTH(node1, list1, node2, list2) \
for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHeader : NULL), *cell2 = (NULL != (list2) ? (list2)->pHeader : NULL); \
(NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \
cell1 = cell1->pNext, cell2 = cell2->pNext)
typedef struct SDataType {
uint8_t type;
uint8_t precision;
......@@ -156,14 +159,19 @@ typedef struct SLogicConditionNode {
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNot;
bool isNull;
} SIsNullCondNode;
typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST
SNodeList* pNodeList;
} SNodeListNode;
typedef struct SFunctionNode {
SExprNode type; // QUERY_NODE_FUNCTION
char functionName[TSDB_FUNC_NAME_LEN];
int32_t funcId;
SNodeList* pParameterList; // SNode
SNodeList* pParameterList;
} SFunctionNode;
typedef struct STableNode {
......@@ -241,24 +249,41 @@ typedef struct SSessionWindowNode {
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
int64_t interval;
int64_t sliding;
int64_t offset;
SNode* pInterval; // SValueNode
SNode* pOffset; // SValueNode
SNode* pSliding; // SValueNode
SNode* pFill;
} SIntervalWindowNode;
typedef enum EFillMode {
FILL_MODE_NONE = 1,
FILL_MODE_VALUE,
FILL_MODE_PREV,
FILL_MODE_NULL,
FILL_MODE_LINEAR,
FILL_MODE_NEXT
} EFillMode;
typedef struct SFillNode {
ENodeType type; // QUERY_NODE_FILL
EFillMode mode;
SNode* pValues; // SNodeListNode
} SFillNode;
typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT
bool isDistinct;
bool isStar;
SNodeList* pProjectionList; // SNode
SNode* pFromTable;
SNode* pWhereCond;
SNode* pWhere;
SNodeList* pPartitionByList; // SNode
SNode* pWindowClause;
SNode* pWindow;
SNodeList* pGroupByList; // SGroupingSetNode
SNode* pHaving;
SNodeList* pOrderByList; // SOrderByExprNode
SLimitNode limit;
SLimitNode slimit;
SNode* pLimit;
SNode* pSlimit;
} SSelectStmt;
typedef enum ESetOperatorType {
......@@ -272,10 +297,17 @@ typedef struct SSetOperator {
SNode* pRight;
} SSetOperator;
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
SNodeList* nodesMakeList();
SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode);
void nodesDestroyList(SNodeList* pList);
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
bool nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
bool nodesWalkNodeList(SNodeList* pNodeList, FQueryNodeWalker walker, void* pContext);
bool nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext);
......@@ -289,10 +321,6 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
void nodesDestroyNodeList(SNodeList* pList);
#ifdef __cplusplus
}
#endif
......
......@@ -30,4 +30,4 @@ static const int32_t endian_test_var = 1;
}
#endif
#endif /*_TD_OS_ENDIAN_H_*/
\ No newline at end of file
#endif /*_TD_OS_ENDIAN_H_*/
......@@ -36,25 +36,25 @@ extern "C" {
#else
#define TSWAP(a, b, c) \
do { \
typeof(a) __tmp = (a); \
(a) = (b); \
(b) = __tmp; \
#define TSWAP(a, b, c) \
do { \
__typeof(a) __tmp = (a); \
(a) = (b); \
(b) = __tmp; \
} while (0)
#define TMAX(a, b) \
({ \
typeof(a) __a = (a); \
typeof(b) __b = (b); \
(__a > __b) ? __a : __b; \
#define TMAX(a, b) \
({ \
__typeof(a) __a = (a); \
__typeof(b) __b = (b); \
(__a > __b) ? __a : __b; \
})
#define TMIN(a, b) \
({ \
typeof(a) __a = (a); \
typeof(b) __b = (b); \
(__a < __b) ? __a : __b; \
#define TMIN(a, b) \
({ \
__typeof(a) __a = (a); \
__typeof(b) __b = (b); \
(__a < __b) ? __a : __b; \
})
#endif
......
......@@ -254,6 +254,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E7)
// dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
......
......@@ -30,6 +30,16 @@ extern "C" {
#include "tmsgtype.h"
#include "trpc.h"
#include "query.h"
#include "parser.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
goto label; \
} \
} while (0)
#define HEARTBEAT_INTERVAL 1500 // ms
......@@ -219,6 +229,11 @@ void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
......@@ -227,7 +242,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
void appHbMgrCleanup(void);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType);
......
......@@ -384,6 +384,7 @@ static void hbStopThread() {
}
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
return NULL;
hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
......@@ -425,28 +426,23 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
return pAppHbMgr;
}
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
if (NULL == pAppHbMgr) {
return;
}
void appHbMgrCleanup(void) {
pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
}
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
}
pthread_mutex_unlock(&clientHbMgr.lock);
}
int hbMgrInit() {
return 0;
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
......@@ -464,6 +460,7 @@ int hbMgrInit() {
}
void hbMgrCleanUp() {
return;
hbStopThread();
// destroy all appHbMgr
......@@ -471,6 +468,7 @@ void hbMgrCleanUp() {
if (old == 0) return;
pthread_mutex_lock(&clientHbMgr.lock);
appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs);
pthread_mutex_unlock(&clientHbMgr.lock);
......@@ -501,6 +499,7 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
return 0;
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0};
......@@ -530,9 +529,6 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
return;
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
if (atomic_load_32(&pAppHbMgr->connKeyCnt) <= 0) {
appHbMgrCleanup(pAppHbMgr);
}
}
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
......
#include "../../libs/scheduler/inc/schedulerInt.h"
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
......@@ -13,15 +12,6 @@
#include "tpagedfile.h"
#include "tref.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \
goto label; \
} \
} while (0)
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
......@@ -119,7 +109,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p, key);
/*p->pAppHbMgr = appHbMgrInit(p, key);*/
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p;
......@@ -239,9 +229,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) {
// handle error and retry
} else {
......@@ -255,524 +246,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return pRequest->code;
}
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
}
typedef struct SMqClientVg {
// statistics
int64_t pollCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
//connection info
int32_t vgId;
SEpSet epSet;
} SMqClientVg;
typedef struct SMqClientTopic {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
int32_t nextVgIdx;
SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic;
typedef struct tmq_resp_err_t {
int32_t code;
} tmq_resp_err_t;
typedef struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
} tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
} tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
struct tmq_conf_t {
char clientId[256];
char groupId[256];
char* ip;
uint16_t port;
tmq_commit_cb* commit_cb;
};
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
return conf;
}
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
}
return 0;
}
struct tmq_t {
char groupId[256];
char clientId[256];
int64_t consumerId;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
};
tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
if (pTmq == NULL) {
return NULL;
}
pTmq->pTscObj = (STscObj*)conn;
pTmq->status = 0;
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->commit_cb = conf->commit_cb;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
return pTmq;
}
struct tmq_list_t {
int32_t cnt;
int32_t tot;
char* elems[];
};
tmq_list_t* tmq_list_new() {
tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
if (ptr == NULL) {
return ptr;
}
ptr->cnt = 0;
ptr->tot = 8;
return ptr;
}
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
if (ptr->cnt >= ptr->tot-1) return -1;
ptr->elems[ptr->cnt] = strdup(src);
ptr->cnt++;
return 0;
}
int32_t tmq_null_cb(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == 0) {
//
}
//
return 0;
}
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL;
int32_t sz = topic_list->cnt;
//destroy ex
taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
SCMSubscribeReq req;
req.topicNum = sz;
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
req.topicNames = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) {
char* topicName = topic_list->elems[i];
SName name = {0};
char* dbName = getDbOfConnection(tmq->pTscObj);
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
tNameFromString(&name, topicName, T_NAME_TABLE);
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFname == NULL) {
}
tNameExtractFullName(&name, topicFname);
tscDebug("subscribe topic: %s", topicFname);
SMqClientTopic topic = {
.nextVgIdx = 0,
.sql = NULL,
.sqlLen = 0,
.topicId = 0,
.topicName = topicFname,
.vgs = NULL
};
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
taosArrayPush(tmq->clientTopics, &topic);
/*SMqClientTopic topic = {*/
/*.*/
/*};*/
taosArrayPush(req.topicNames, &topicFname);
}
int tlen = tSerializeSCMSubscribeReq(NULL, &req);
void* buf = malloc(tlen);
if(buf == NULL) {
goto _return;
}
void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
if (pRequest == NULL) {
tscError("failed to malloc sqlObj");
}
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->fp*/
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
conf->commit_cb = cb;
}
SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
tmq_t* pTmq = (void*)param;
SArray* pArray = taosArrayInit(0, sizeof(SKv));
if (pArray == NULL) {
return NULL;
}
SKv kv = {0};
kv.key = HEARTBEAT_KEY_MQ_TMP;
SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (pMqHb == NULL) {
return pArray;
}
pMqHb->consumerId = connKey.connId;
SArray* clientTopics = pTmq->clientTopics;
int sz = taosArrayGetSize(clientTopics);
for (int i = 0; i < sz; i++) {
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
/*if (pCTopic->vgId == -1) {*/
/*pMqHb->status = 1;*/
/*break;*/
/*}*/
}
kv.value = pMqHb;
kv.valueLen = sizeof(SMqHbMsg);
taosArrayPush(pArray, &kv);
return pArray;
}
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = malloc(sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
}
strcpy(pTmq->groupId, conf->groupId);
strcpy(pTmq->clientId, conf->clientId);
pTmq->pTscObj = (STscObj*)conn;
pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;
return pTmq;
}
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj *pTscObj = (STscObj*)taos;
SRequestObj *pRequest = NULL;
SQueryNode *pQueryNode = NULL;
char *pStr = NULL;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || topicName == NULL || sql == NULL) {
tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create topic, %s", topicName);
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
pQueryStmtInfo->info.continueQuery = true;
// todo check for invalid sql statement and return with error code
SSchema *schema = NULL;
int32_t numOfCols = 0;
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return);
pStr = qDagToString(pRequest->body.pDag);
if(pStr == NULL) {
goto _return;
}
printf("%s\n", pStr);
// The topic should be related to a database that the queried table is belonged to.
SName name = {0};
char dbName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);
tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
tNameFromString(&name, topicName, T_NAME_TABLE);
char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
tNameExtractFullName(&name, topicFname);
SCMCreateTopicReq req = {
.name = (char*) topicFname,
.igExists = 1,
.physicalPlan = (char*) pStr,
.sql = (char*) sql,
.logicalPlan = "no logic plan",
};
int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
void* buf = malloc(tlen);
if(buf == NULL) {
goto _return;
}
void* abuf = buf;
tSerializeSCMCreateTopicReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
/*typedef SMqConsumeRsp tmq_message_t;*/
struct tmq_message_t {
SMqConsumeRsp rsp;
};
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
SMqConsumeRsp rsp;
tDecodeSMqConsumeRsp(pMsg->pData, &rsp);
int32_t colNum = rsp.schemas->nCols;
for (int32_t i = 0; i < colNum; i++) {
printf("| %s |", rsp.schemas->pSchema[i].name);
}
printf("\n");
int32_t sz = taosArrayGetSize(rsp.pBlockData);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i);
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < colNum; j++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, j);
for (int32_t k = 0; k < rows; k++) {
void* var = POINTER_SHIFT(pColInfoData->pData, k * pColInfoData->info.bytes);
if (j == 0) printf(" %ld ", *(int64_t*)var);
if (j == 1) printf(" %d ", *(int32_t*)var);
}
}
/*pDataBlock->*/
}
return 0;
}
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = (tmq_t*)param;
if (code != 0) {
tsem_post(&tmq->rspSem);
return 0;
}
tscDebug("tmq ask ep cb called");
bool set = false;
SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
int32_t sz = taosArrayGetSize(rsp.topics);
// TODO: lock
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
SMqClientVg clientVg = {
.pollCnt = 0,
.committedOffset = -1,
.currentOffset = -1,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(tmq->clientTopics, &topic);
}
if(set) tmq->status = 1;
// unlock
tsem_post(&tmq->rspSem);
return 0;
}
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) {
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) {
tscError("failed to malloc get subscribe ep buf");
}
buf->consumerId = htobe64(tmq->consumerId);
strcpy(buf->cgroup, tmq->groupId);
SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) {
tscError("failed to malloc subscribe ep request");
}
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = tmq;
sendInfo->fp = tmq_ask_ep_cb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&tmq->rspSem);
}
if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
return NULL;
}
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
pReq->reqType = 1;
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(pReq->cgroup, tmq->groupId);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(pReq->topic, pTopic->topicName);
int32_t nextVgIdx = pTopic->nextVgIdx;
pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
pReq->offset = pVg->currentOffset;
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
/*sendInfo->param = &tmq_message;*/
sendInfo->fp = tmq_poll_cb_inner;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
return tmq_message;
/*tsem_wait(&pRequest->body.rspSem);*/
/*if (body != NULL) {*/
/*destroySendMsgInfo(body);*/
/*}*/
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
/*pRequest->code = terrno;*/
/*}*/
/*return pRequest;*/
return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob);
}
tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
SMqConsumeReq req = {0};
return NULL;
}
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
}
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos;
if (sqlLen > (size_t) tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
......@@ -1059,8 +539,12 @@ void* doFetchRow(SRequestObj* pRequest) {
tsem_wait(&pRequest->body.rspSem);
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
return NULL;
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
if (pResultInfo->completed) {
return NULL;
}
}
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
......
......@@ -78,7 +78,7 @@ void taos_close(TAOS* taos) {
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(clientConnRefPool, pTscObj->id);
/*taosRemoveRef(clientConnRefPool, pTscObj->id);*/
}
int taos_errno(TAOS_RES *tres) {
......@@ -343,4 +343,4 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
// TODO
}
\ No newline at end of file
}
......@@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
......@@ -201,6 +201,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pRetrieve->numOfRows;
pResInfo->pData = pRetrieve->data;
pResInfo->completed = pRetrieve->completed;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
......
此差异已折叠。
......@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
clientTest
PUBLIC os util common transport gtest taos qcom
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
)
TARGET_INCLUDE_DIRECTORIES(
......
......@@ -53,6 +53,7 @@ TEST(testCase, driverInit_Test) {
// taos_init();
}
#if 0
TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
......@@ -561,9 +562,9 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
#if 0
TEST(testCase, create_topic_Test) {
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -582,13 +583,37 @@ TEST(testCase, create_topic_Test) {
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, tmq_subscribe_Test) {
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -600,26 +625,60 @@ TEST(testCase, tmq_subscribe_Test) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_topic_1");
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
printf("get msg\n");
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_TEST) {
}
#if 0
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -732,5 +791,6 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
#pragma GCC diagnostic pop
......@@ -78,7 +78,6 @@ int32_t tsCompressColData = -1;
int32_t tsCompatibleModel = 1;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;
int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN;
......@@ -594,16 +593,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "maxSQLLength";
cfg.ptr = &tsMaxSQLStringLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MAX_SQL_LEN;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosAddConfigOption(cfg);
cfg.option = "maxWildCardsLength";
cfg.ptr = &tsMaxWildCardsLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......
......@@ -320,3 +320,18 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
return buf;
}
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU8(buf, pReq->type);
return tlen;
}
void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeString(buf, &pReq->name);
buf = taosDecodeFixedU8(buf, &pReq->type);
return buf;
}
......@@ -630,14 +630,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
}
}
#define TSWAP(a, b, c) \
do { \
typeof(a) __tmp = (a); \
(a) = (b); \
(b) = __tmp; \
} while (0)
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf) {
switch (type) {
case TSDB_DATA_TYPE_INT:
......
......@@ -199,19 +199,16 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2];
int32_t bsize = tSerializeSVCreateTbReq(NULL, &req);
void* buf = rpcMallocCont(sizeof(SMsgHead) + bsize);
SMsgHead* pMsgHead = (SMsgHead*)buf;
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize);
pMsgHead->vgId = htonl(2);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req);
int32_t contLen = sizeof(SMsgHead) + bsize;
SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, buf, contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, (void*)pHead, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
......@@ -235,20 +232,28 @@ TEST_F(DndTestVnode, 04_ALTER_Stb) {
}
TEST_F(DndTestVnode, 05_DROP_Stb) {
#if 0
{
for (int i = 0; i < 3; ++i) {
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, pReq, contLen);
SVDropTbReq req = {0};
req.ver = 0;
req.name = (char*)"stb1";
req.suid = 9599;
req.type = TD_SUPER_TABLE;
int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVDropTbReq(&pBuf, &req);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, (void*)pHead, contLen);
ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_INVALID_TABLE_ID);
}
ASSERT_EQ(pRsp->code, 0);
}
}
#endif
}
TEST_F(DndTestVnode, 06_DROP_Vnode) {
......
......@@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
return buf;
}
static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
if (pConsumerEp) {
tfree(pConsumerEp->qmsg);
}
}
// unit for rebalance
typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN];
......@@ -520,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->assigned, &cEp);
}
......@@ -533,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->lostConsumer, &cEp);
}
......@@ -547,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->idleConsumer, &cEp);
}
......@@ -563,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedVg, &cEp);
}
......@@ -571,6 +577,33 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return buf;
}
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->availConsumer) {
taosArrayDestroy(pSub->availConsumer);
pSub->availConsumer = NULL;
}
if (pSub->assigned) {
//taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->assigned);
pSub->assigned = NULL;
}
if (pSub->unassignedVg) {
//taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL;
}
if (pSub->idleConsumer) {
//taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->idleConsumer);
pSub->idleConsumer = NULL;
}
if (pSub->lostConsumer) {
//taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
taosArrayDestroy(pSub->lostConsumer);
pSub->lostConsumer = NULL;
}
}
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
......@@ -604,7 +637,7 @@ typedef struct SMqConsumerTopic {
} SMqConsumerTopic;
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic,
SMqSubscribeObj* pSub) {
SMqSubscribeObj* pSub, int64_t* oldConsumerId) {
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
if (pCTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -617,6 +650,7 @@ static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqT
int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg);
if (unassignedVgSz > 0) {
SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg);
*oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId);
taosArrayPush(pSub->assigned, pCEp);
......@@ -660,12 +694,17 @@ typedef struct SMqConsumerObj {
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqConsumerTopic>
// SHashObj *topicHash; //SHashObj<SMqTopicObj>
int64_t epoch;
// stat
int64_t pollCnt;
} SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
tlen += taosEncodeString(buf, pConsumer->cgroup);
int32_t sz = taosArrayGetSize(pConsumer->topics);
tlen += taosEncodeFixedI32(buf, sz);
......@@ -678,6 +717,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumer->connId);
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
......@@ -54,13 +55,14 @@ void mndCleanupConsumer(SMnode *pMnode) {}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) goto CM_ENCODE_OVER;
void *buf = malloc(tlen);
buf = malloc(tlen);
if (buf == NULL) goto CM_ENCODE_OVER;
void *abuf = buf;
......@@ -75,6 +77,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_SUCCESS;
CM_ENCODE_OVER:
tfree(buf);
if (terrno != 0) {
mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -87,6 +90,7 @@ CM_ENCODE_OVER:
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
......@@ -105,7 +109,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
int32_t len;
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
void *buf = malloc(len);
buf = malloc(len);
if (buf == NULL) goto CM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
......@@ -117,6 +121,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_SUCCESS;
CM_DECODE_OVER:
tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
tfree(pRow);
......
......@@ -181,23 +181,26 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
atomic_exchange_32(&pOld->updateTime, pNew->updateTime);
atomic_exchange_32(&pOld->version, pNew->version);
taosWLockLatch(&pOld->lock);
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
void *pSchema = malloc(totalSize);
if (pSchema != NULL) {
free(pOld->pSchema);
pOld->pSchema = pSchema;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
taosWUnLockLatch(&pOld->lock);
}
}
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
taosWUnLockLatch(&pOld->lock);
return 0;
......@@ -228,15 +231,11 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
}
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SVCreateTbReq req;
void *buf;
int32_t bsize;
SMsgHead *pMsgHead;
req.ver = 0;
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateTbReq req = {0};
req.ver = 0;
req.name = (char *)tNameGetTableName(&name);
req.ttl = 0;
req.keep = 0;
......@@ -247,40 +246,48 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
bsize = tSerializeSVCreateTbReq(NULL, &req);
buf = malloc(sizeof(SMsgHead) + bsize);
if (buf == NULL) {
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = malloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize);
pMsgHead->vgId = htonl(pVgroup->vgId);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req);
*pContLen = sizeof(SMsgHead) + bsize;
return buf;
*pContLen = contLen;
return pHead;
}
static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropTbReq);
static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVDropTbReq req = {0};
req.ver = 0;
req.name = (char *)tNameGetTableName(&name);
req.type = TD_SUPER_TABLE;
req.suid = pStb->uid;
SVDropTbReq *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = malloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pDrop->suid = htobe64(pStb->uid);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
return pDrop;
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVDropTbReq(&pBuf, &req);
*pContLen = contLen;
return pHead;
}
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
......@@ -358,7 +365,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
int32_t contLen;
int32_t contLen;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
......@@ -400,7 +407,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
SVDropTbReq *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb);
int32_t contLen = 0;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
......@@ -411,7 +419,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq;
action.contLen = sizeof(SVDropTbReq);
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_STB;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pReq);
......@@ -498,9 +506,9 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
}
// topic should have different name with stb
SStbObj *pTopic = mndAcquireStb(pMnode, pCreate->name);
if (pTopic != NULL) {
sdbRelease(pMnode->pSdb, pTopic);
SStbObj *pTopicStb = mndAcquireStb(pMnode, pCreate->name);
if (pTopicStb != NULL) {
mndReleaseStb(pMnode, pTopicStb);
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
......@@ -517,7 +525,6 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
mndReleaseDb(pMnode, pDb);
if (code != 0) {
terrno = code;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
......@@ -603,15 +610,6 @@ static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pS
return 0;
}
static int32_t mndSetDropStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
......@@ -621,22 +619,54 @@ static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *
return 0;
}
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
int32_t contLen;
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) {
int32_t contLen = 0;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_STB;
action.acceptableCode = TSDB_CODE_VND_TB_NOT_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL)goto DROP_STB_OVER;
if (pTrans == NULL) goto DROP_STB_OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto DROP_STB_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER;
code = 0;
......@@ -664,7 +694,16 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
}
}
int32_t code = mndDropStb(pMnode, pReq, pStb);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pDrop->name);
if (pDb == NULL) {
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
}
int32_t code = mndDropStb(pMnode, pReq, pDb, pStb);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
if (code != 0) {
......@@ -860,7 +899,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
if (pStb->dbUid != pDb->uid) {
if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%"PRIu64, pStb->name, pDb->name, pDb->uid);
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pStb);
......
......@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndSubscribe.h"
#include "mndConsumer.h"
......@@ -30,6 +31,8 @@
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_MS 5000
static char *mndMakeSubscribeKey(char *cgroup, char *topicName);
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
......@@ -46,7 +49,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub,
int64_t oldConsumerId);
int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
......@@ -67,8 +71,9 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp;
SMqCMGetSubEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId);
int64_t currentTs = taosGetTimestampMs();
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
if (pConsumer == NULL) {
......@@ -79,6 +84,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch;
SArray *pTopics = pConsumer->topics;
int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
......@@ -88,21 +94,39 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy(topicEp.topic, pConsumerTopic->name);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name);
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
ASSERT(pSub);
bool found = 0;
bool changed = 0;
for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) {
found = 1;
break;
}
}
if (found == 0) {
taosArrayPush(pSub->availConsumer, &consumerId);
}
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
for (int32_t j = 0; j < assignedSz; j++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j);
if (pCEp->consumerId == consumerId) {
SMqSubVgEp vgEp = {
.epSet = pCEp->epSet,
.vgId = pCEp->vgId
};
pCEp->lastConsumerHbTs = currentTs;
SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId};
taosArrayPush(topicEp.vgs, &vgEp);
changed = 1;
}
}
if (taosArrayGetSize(topicEp.vgs) != 0) {
taosArrayPush(rsp.topics, &topicEp);
}
if (changed || found) {
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWrite(pMnode->pSdb, pRaw);
}
mndReleaseSubscribe(pMnode, pSub);
}
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen);
......@@ -112,7 +136,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
}
void *abuf = buf;
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
//TODO: free rsp
tDeleteSMqCMGetSubEpRsp(&rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
return 0;
......@@ -124,9 +148,9 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
i++;
}
key[i] = 0;
*topic = strdup(key);
*cgroup = strdup(key);
key[i] = ':';
*cgroup = strdup(&key[i + 1]);
*topic = strdup(&key[i + 1]);
return 0;
}
......@@ -135,9 +159,40 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SSdb *pSdb = pMnode->pSdb;
SMqSubscribeObj *pSub = NULL;
void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
int sz;
int64_t currentTs = taosGetTimestampMs();
int32_t sz;
while (pIter != NULL) {
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId;
if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) {
// put consumer into lostConsumer
taosArrayPush(pSub->lostConsumer, pCEp);
// put vg into unassigned
taosArrayPush(pSub->unassignedVg, pCEp);
// remove from assigned
// TODO: swap with last one, reduce size and reset i
taosArrayRemove(pSub->assigned, i);
// remove from available consumer
for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
taosArrayRemove(pSub->availConsumer, j);
break;
}
// TODO: acquire consumer, set status to unavail
}
#if 0
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
mndReleaseConsumer(pMnode, pConsumer);
#endif
}
}
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) {
char *topic = NULL;
char *cgroup = NULL;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
......@@ -146,58 +201,70 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// create trans
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int i = 0; i < sz; i++) {
for (int32_t i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
int64_t oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/
mndReleaseConsumer(pMnode, pConsumer);
// build msg
SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq));
if (pReq == NULL) {
SMqSetCVgReq req = {0};
strcpy(req.cgroup, cgroup);
strcpy(req.topicName, topic);
req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pCEp->qmsg;
req.oldConsumerId = oldConsumerId;
req.newConsumerId = consumerId;
req.vgId = pCEp->vgId;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pReq->cgroup, cgroup);
strcpy(pReq->topicName, topic);
pReq->sql = strdup(pTopic->sql);
pReq->logicalPlan = strdup(pTopic->logicalPlan);
pReq->physicalPlan = strdup(pTopic->physicalPlan);
pReq->qmsg = strdup(pCEp->qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = reqStr;
tEncodeSMqSetCVgReq(&abuf, pReq);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pCEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req);
// persist msg
// TODO: no need for txn
STransAction action = {0};
action.epSet = pCEp->epSet;
action.pCont = reqStr;
action.contLen = tlen;
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndTransAppendRedoAction(pTrans, &action);
// persist raw
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
free(pReq);
tfree(topic);
tfree(cgroup);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
}
/*mndReleaseTopic(pMnode, pTopic);*/
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
}
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
}
return 0;
}
......@@ -205,33 +272,49 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
// convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray;
SArray *pArray = NULL;
SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0);
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
plan->execNode.nodeId = 2;
SEpSet* pEpSet = &plan->execNode.epset;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) continue;
pEpSet->inUse = 0;
addEpIntoEpSet(pEpSet, "localhost", 6030);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
return -1;
}
int32_t sz = taosArrayGetSize(pArray);
// convert dag to msg
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
return -1;
}
if (pArray && taosArrayGetSize(pArray) != 1) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));
return -1;
}
SMqConsumerEp CEp = {0};
CEp.status = 0;
CEp.consumerId = -1;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
CEp.epSet = pTaskInfo->addr.epset;
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId;
ASSERT(CEp.vgId == pVgroup->vgId);
CEp.qmsg = strdup(pTaskInfo->msg->msg);
taosArrayPush(unassignedVg, &CEp);
// TODO: free taskInfo
taosArrayDestroy(pArray);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
}
/*qDestroyQueryDag(pDag);*/
......@@ -239,14 +322,15 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
}
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) {
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp,
int64_t oldConsumerId) {
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = {
.vgId = vgId,
.oldConsumerId = -1,
.oldConsumerId = oldConsumerId,
.newConsumerId = pConsumer->consumerId,
};
strcpy(req.cgroup, pConsumer->cgroup);
......@@ -289,13 +373,14 @@ void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
if (pRaw == NULL) goto SUB_ENCODE_OVER;
void *buf = malloc(tlen);
buf = malloc(tlen);
if (buf == NULL) goto SUB_ENCODE_OVER;
void *abuf = buf;
......@@ -310,6 +395,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno = TSDB_CODE_SUCCESS;
SUB_ENCODE_OVER:
tfree(buf);
if (terrno != 0) {
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -322,6 +408,7 @@ SUB_ENCODE_OVER:
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
......@@ -341,7 +428,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
void *buf = malloc(tlen + 1);
buf = malloc(tlen + 1);
if (buf == NULL) goto SUB_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
......@@ -353,6 +440,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_SUCCESS;
SUB_DECODE_OVER:
tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
// TODO free subscribeobj
......@@ -370,6 +458,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
mTrace("subscribe:%s, perform delete action", pSub->key);
tDeleteSMqSubscribeObj(pSub);
return 0;
}
......@@ -378,10 +467,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
return 0;
}
static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
return 0;
}
static char *mndMakeSubscribeKey(char *cgroup, char *topicName) {
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
if (key == NULL) {
......@@ -435,10 +520,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
strcpy(pConsumer->cgroup, consumerGroup);
taosInitRWLatch(&pConsumer->lock);
} else {
pConsumer->epoch++;
oldSub = pConsumer->topics;
}
pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
......@@ -542,6 +629,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
bool create = false;
if (pSub == NULL) {
mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
pSub = tNewSubscribeObj();
......@@ -550,14 +638,24 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1;
}
char *key = mndMakeSubscribeKey(consumerGroup, newTopicName);
if (key == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pSub->key, key);
free(key);
// set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) {
// TODO: free memory
return -1;
}
// TODO: disable alter
create = true;
}
taosArrayPush(pSub->availConsumer, &consumerId);
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
int64_t oldConsumerId;
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub, &oldConsumerId);
taosArrayPush(pConsumer->topics, pConsumerTopic);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
......@@ -565,7 +663,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned);
if (pCEp->vgId == vgId) {
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) {
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp, oldConsumerId) < 0) {
// TODO
return -1;
}
......@@ -576,6 +674,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
if (!create) mndReleaseSubscribe(pMnode, pSub);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
......
......@@ -103,6 +103,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
}
code = 0;
......
......@@ -240,15 +240,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
topicObj.dbUid = pDb->uid;
topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql);
topicObj.physicalPlan = strdup(pCreate->physicalPlan);
topicObj.logicalPlan = strdup(pCreate->logicalPlan);
topicObj.sql = pCreate->sql;
topicObj.physicalPlan = pCreate->physicalPlan;
topicObj.logicalPlan = pCreate->logicalPlan;
topicObj.sqlLen = strlen(pCreate->sql);
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
// TODO: replace with trans to support recovery
return sdbWrite(pMnode->pSdb, pTopicRaw);
}
......
......@@ -35,71 +35,8 @@
extern "C" {
#endif
typedef struct STqMsgHead {
int32_t protoVer;
int32_t msgType;
int64_t cgId;
int64_t clientId;
} STqMsgHead;
typedef struct STqOneAck {
int64_t topicId;
int64_t consumeOffset;
} STqOneAck;
typedef struct STqAcks {
int32_t ackNum;
// should be sorted
STqOneAck acks[];
} STqAcks;
typedef struct STqSetCurReq {
STqMsgHead head;
int64_t topicId;
int64_t offset;
} STqSetCurReq;
typedef struct STqConsumeReq {
STqMsgHead head;
int64_t blockingTime; // milisec
STqAcks acks;
} STqConsumeReq;
typedef struct STqMsgContent {
int64_t topicId;
int64_t msgLen;
char msg[];
} STqMsgContent;
typedef struct STqConsumeRsp {
STqMsgHead head;
int64_t bodySize;
STqMsgContent msgs[];
} STqConsumeRsp;
typedef struct STqSubscribeReq {
STqMsgHead head;
int32_t topicNum;
int64_t topic[];
} STqSubscribeReq;
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
#define TQ_BUFFER_SIZE 8
typedef struct STqExec {
void* runtimeEnv;
SSDataBlock* (*exec)(void* runtimeEnv);
void* (*assign)(void* runtimeEnv, void* inputData);
void (*clear)(void* runtimeEnv);
char* (*serialize)(struct STqExec*);
struct STqExec* (*deserialize)(char*);
} STqExec;
typedef struct STqRspHandle {
void* handle;
void* ahandle;
......@@ -107,47 +44,6 @@ typedef struct STqRspHandle {
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
typedef struct STqTopic STqTopic;
typedef struct STqBufferItem {
int64_t offset;
// executors are identical but not concurrent
// so there must be a copy in each item
STqExec* executor;
int32_t status;
int64_t size;
void* content;
STqTopic* pTopic;
} STqMsgItem;
struct STqTopic {
// char* topic; //c style, end with '\0'
// int64_t cgId;
// void* ahandle;
// int32_t head;
// int32_t tail;
int64_t nextConsumeOffset;
int64_t floatingCursor;
int64_t topicId;
void* logReader;
STqMsgItem buffer[TQ_BUFFER_SIZE];
};
typedef struct STqListHandle {
STqTopic topic;
struct STqListHandle* next;
} STqList;
typedef struct STqGroup {
int64_t clientId;
int64_t cgId;
void* ahandle;
int32_t topicNum;
STqList* head;
SList* topicList; // SList<STqTopic>
STqRspHandle rspHandle;
} STqGroup;
typedef struct STqTaskItem {
int8_t status;
int64_t offset;
......@@ -182,11 +78,6 @@ typedef struct STqConsumerHandle {
SArray* topics; // SArray<STqClientTopic>
} STqConsumerHandle;
typedef struct STqQueryMsg {
STqMsgItem* item;
struct STqQueryMsg* next;
} STqQueryMsg;
typedef struct STqMemRef {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
......@@ -305,20 +196,6 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*);
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
#if 0
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);
STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
STqGroup* tqGetGroup(STQ*, int64_t clientId);
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqRegisterContext(STqGroup*, void* ahandle);
int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
......
......@@ -91,7 +91,7 @@ int tsdbCommit(STsdb *pTsdb);
int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReadHandleT;
typedef void* tsdbReaderT;
/**
* Get the data block iterator, starting from position according to the query condition
......@@ -103,7 +103,7 @@ typedef void* tsdbReadHandleT;
* @param qinfo query info handle from query processor
* @return
*/
tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId);
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
......@@ -115,13 +115,13 @@ tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup
* @param tableInfo table list.
* @return
*/
//tsdbReadHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
//tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle);
/**
*
......@@ -138,7 +138,7 @@ bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
* @param reqId
* @return
*/
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
/**
......@@ -148,7 +148,7 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle);
/**
* move to next block if exists
......@@ -156,7 +156,7 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle);
* @param pTsdbReadHandle
* @return
*/
bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
/**
* Get current data block information
......@@ -165,7 +165,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle);
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/**
*
......@@ -177,7 +177,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/**
*
......@@ -189,7 +189,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataS
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(tsdbReadHandleT *pTsdbReadHandle, SArray *pColumnIdList);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
/**
* destroy the created table group list, which is generated by tag query
......@@ -205,7 +205,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
......@@ -220,7 +220,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
#ifdef __cplusplus
}
......
......@@ -40,7 +40,6 @@ typedef struct STqCfg {
int32_t reserved;
} STqCfg;
typedef struct SVnodeCfg {
int32_t vgId;
SDnode *pDnode;
......@@ -69,17 +68,18 @@ typedef struct {
} SVnodeOpt;
typedef struct STqReadHandle {
int64_t ver;
uint64_t tbUid;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pVnodeMeta;
SArray* pColIdList; //SArray<int32_t>
int32_t sver;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
int64_t ver;
uint64_t tbUid;
SHashObj *tbIdHash;
const SSubmitMsg *pMsg;
SSubmitBlk *pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta *pVnodeMeta;
SArray *pColIdList; // SArray<int32_t>
int32_t sver;
SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema;
} STqReadHandle;
/* ------------------------ SVnode ------------------------ */
......@@ -201,22 +201,34 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ QUERY -------------------------- */
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta);
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) {
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) {
pReadHandle->pColIdList = pColIdList;
}
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) {
pHandle->tbUid = tbUid;
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
// pHandle->tbUid = pTableIdList;
//}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
return -1;
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
// pHandle->tbUid = tbUid;
}
return 0;
}
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitMsg *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
// return SArray<SColumnInfoData>
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
#ifdef __cplusplus
}
......
......@@ -16,9 +16,9 @@
#ifndef _TD_TQ_INT_H_
#define _TD_TQ_INT_H_
#include "tq.h"
#include "meta.h"
#include "tlog.h"
#include "tq.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
......@@ -26,29 +26,48 @@ extern "C" {
extern int32_t tqDebugFlag;
#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }}
#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }}
#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }}
#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }}
#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
// create persistent storage for meta info such as consuming offset
// return value > 0: cgId
// return value <= 0: error code
// int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
// create ring buffer in memory and load consuming offset
// int tqOpenTCGroup(STQ*, const char* topic, int cgId);
// destroy ring buffer and persist consuming offset
// int tqCloseTCGroup(STQ*, const char* topic, int cgId);
// delete persistent storage for meta info
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
//int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
//const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
#define tqFatal(...) \
{ \
if (tqDebugFlag & DEBUG_FATAL) { \
taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); \
} \
}
#define tqError(...) \
{ \
if (tqDebugFlag & DEBUG_ERROR) { \
taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); \
} \
}
#define tqWarn(...) \
{ \
if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", 255, __VA_ARGS__); \
} \
}
#define tqInfo(...) \
{ \
if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", 255, __VA_ARGS__); \
} \
}
#define tqDebug(...) \
{ \
if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
} \
}
#define tqTrace(...) \
{ \
if (tqDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
} \
}
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
#ifdef __cplusplus
}
#endif
......
......@@ -40,6 +40,7 @@ int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
// delete committed kv pair
// notice that a delete action still needs to be committed
int32_t tqHandleDel(STqMetaStore*, int64_t key);
int32_t tqHandlePurge(STqMetaStore*, int64_t key);
int32_t tqHandleCommit(STqMetaStore*, int64_t key);
int32_t tqHandleAbort(STqMetaStore*, int64_t key);
......
此差异已折叠。
......@@ -584,12 +584,30 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) {
if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
if (pNode->handle.valueInTxn) {
pMeta->pDeleter(pNode->handle.valueInTxn);
if (pNode->handle.key == key) {
if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
if (pNode->handle.valueInTxn) {
pMeta->pDeleter(pNode->handle.valueInTxn);
}
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode);
return 0;
}
} else {
pNode = pNode->next;
}
}
terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1;
}
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
int32_t tqHandlePurge(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) {
if (pNode->handle.key == key) {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode);
return 0;
} else {
......
......@@ -26,12 +26,14 @@ int vnodeQueryOpen(SVnode *pVnode) {
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing");
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta};
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_QUERY:{
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
}
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -16,6 +16,7 @@
#include "tq.h"
#include "vnd.h"
#if 0
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_VND_MQ_SET_CUR:
......@@ -26,6 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
}
return 0;
}
#endif
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pMsg;
......@@ -103,7 +105,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
taosArrayDestroy(vCreateTbBatchReq.pArray);
break;
case TDMT_VND_ALTER_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
break;
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
break;
case TDMT_VND_DROP_TABLE:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// // TODO: handle error
......
......@@ -39,7 +39,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
}
}
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -50,7 +50,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo));
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*) input, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else {
......
......@@ -51,25 +51,7 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask(*handle);
}
void freeParam(STaskParam *param) {
tfree(param->sql);
tfree(param->tagCond);
tfree(param->tbnameCond);
tfree(param->pTableIdList);
taosArrayDestroy(param->pOperator);
tfree(param->pExprs);
tfree(param->pSecExprs);
tfree(param->pExpr);
tfree(param->pSecExpr);
tfree(param->pGroupColIndex);
tfree(param->pTagColumnInfo);
tfree(param->pGroupbyExpr);
tfree(param->prevResult);
}
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
......
......@@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr;
int32_t code = qCreateExecTask((void*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册