提交 c74b88af 编写于 作者: L Liu Jicong

put tq header input vnd

上级 afe84390
......@@ -19,9 +19,6 @@
#include <time.h>
#include "taos.h"
static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
......@@ -91,124 +88,6 @@ int32_t create_stream() {
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d\n", resp);
}
tmq_t* build_consumer() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
/*int32_t cnt = 0;*/
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
/*cnt++;*/
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
return;
}
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
}
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage);
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
(double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
int code;
if (argc > 1) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TQ_H_
#define _TQ_H_
#include "executor.h"
#include "meta.h"
#include "taoserror.h"
#include "tcommon.h"
#include "tmallocator.h"
#include "tmsg.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#include "vnode.h"
#include "wal.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct STQ STQ;
// memory allocator provided by vnode
typedef struct {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
} STqMemRef;
// init once
int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus
}
#endif
#endif /*_TQ_H_*/
......@@ -18,8 +18,8 @@
#include "meta.h"
#include "tlog.h"
#include "tq.h"
#include "tqPush.h"
#include "vnd.h"
#ifdef __cplusplus
extern "C" {
......@@ -153,6 +153,11 @@ typedef struct {
FTqDelete pDeleter;
} STqMetaStore;
typedef struct {
SMemAllocatorFactory* pAllocatorFactory;
SMemAllocator* pAllocator;
} STqMemRef;
struct STQ {
// the collection of groups
// the handle of meta kvstore
......
......@@ -23,7 +23,6 @@
#include "tlist.h"
#include "tlockfree.h"
#include "tmacro.h"
#include "tq.h"
#include "wal.h"
#include "vnode.h"
......@@ -34,6 +33,8 @@
extern "C" {
#endif
typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
......@@ -171,6 +172,25 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
// init once
int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus
}
#endif
......
......@@ -14,6 +14,7 @@
*/
#include "vnodeQuery.h"
#include "executor.h"
#include "vnd.h"
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
......
......@@ -13,12 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
#include "vnd.h"
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SNodeMsg *pMsg;
SRpcMsg *pRpc;
SRpcMsg *pRpc;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
......
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tq.h"
using namespace std;
TEST(TqSerializerTest, basicTest) {
TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
}
......@@ -16,7 +16,7 @@
#include "executor.h"
#include "executorimpl.h"
#include "planner.h"
#include "tq.h"
#include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) {
ASSERT(pOperator != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册