未验证 提交 bedf272e 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12048 from taosdata/feature/tq

fix: subscrption api
...@@ -140,7 +140,7 @@ int32_t create_topic() { ...@@ -140,7 +140,7 @@ int32_t create_topic() {
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) { void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d\n", resp); printf("commit %d\n", resp);
} }
...@@ -161,7 +161,7 @@ tmq_t* build_consumer() { ...@@ -161,7 +161,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/ /*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq); assert(tmq);
return tmq; return tmq;
...@@ -215,6 +215,17 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -215,6 +215,17 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
return; return;
} }
tmq_list_t* subList = NULL;
tmq_subscription(tmq, &subList);
char** subTopics = tmq_list_to_c_array(subList);
int32_t sz = tmq_list_get_size(subList);
printf("subscribed topics: ");
for (int32_t i = 0; i < sz; i++) {
printf("%s, ", subTopics[i]);
}
printf("\n");
tmq_list_destroy(subList);
while (running) { while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) { if (tmqmessage) {
......
...@@ -213,7 +213,7 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; ...@@ -213,7 +213,7 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
...@@ -253,12 +253,12 @@ typedef enum tmq_conf_res_t tmq_conf_res_t; ...@@ -253,12 +253,12 @@ typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO // TODO
#if 0 #if 0
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
......
...@@ -68,6 +68,7 @@ struct tmq_conf_t { ...@@ -68,6 +68,7 @@ struct tmq_conf_t {
char* pass; char* pass;
char* db; char* db;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam;
}; };
struct tmq_t { struct tmq_t {
...@@ -78,7 +79,8 @@ struct tmq_t { ...@@ -78,7 +79,8 @@ struct tmq_t {
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
int64_t consumerId; int64_t consumerId;
tmq_commit_cb* commit_cb; tmq_commit_cb* commitCb;
void* commitCbUserParam;
// status // status
int8_t status; int8_t status;
...@@ -372,8 +374,8 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -372,8 +374,8 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->tmq->commit_cb) { if (pParam->tmq->commitCb) {
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL); pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam);
} }
if (!pParam->async) tsem_post(&pParam->rspSem); if (!pParam->async) tsem_post(&pParam->rspSem);
return 0; return 0;
...@@ -384,7 +386,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { ...@@ -384,7 +386,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
*topics = tmq_list_new(); *topics = tmq_list_new();
} }
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
tmq_list_append(*topics, topic->topicName); tmq_list_append(*topics, topic->topicName);
} }
return TMQ_RESP_ERR__SUCCESS; return TMQ_RESP_ERR__SUCCESS;
...@@ -477,7 +479,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -477,7 +479,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->autoCommit; pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commit_cb = conf->commitCb; pTmq->commitCb = conf->commitCb;
pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
// assign consumerId // assign consumerId
...@@ -688,9 +691,10 @@ FAIL: ...@@ -688,9 +691,10 @@ FAIL:
return code; return code;
} }
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
// //
conf->commitCb = cb; conf->commitCb = cb;
conf->commitCbUserParam = param;
} }
#if 0 #if 0
...@@ -1306,7 +1310,7 @@ const char* tmq_err2str(tmq_resp_err_t err) { ...@@ -1306,7 +1310,7 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return "fail"; return "fail";
} }
char* tmq_get_topic_name(TAOS_RES* res) { const char* tmq_get_topic_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->topic; return pRspObj->topic;
......
...@@ -144,6 +144,10 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM ...@@ -144,6 +144,10 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
int32_t vgId = pRebVg->pVgEp->vgId; int32_t vgId = pRebVg->pVgEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) {
taosMemoryFree(buf);
return -1;
}
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
...@@ -509,7 +513,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { ...@@ -509,7 +513,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
// TODO replace assert with error check // TODO replace assert with error check
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped");
}
if (rebInput.pTopic) { if (rebInput.pTopic) {
SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic; SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic;
......
...@@ -91,16 +91,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -91,16 +91,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
int32_t sversion = 0; int32_t sversion = 0;
if (pHandle->sver != sversion) { if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
#if 0
tb_uid_t quid; // this interface use suid instead of uid
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->msgIter.uid;
}
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
#endif
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
pHandle->sver = sversion; pHandle->sver = sversion;
} }
......
...@@ -45,6 +45,47 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { ...@@ -45,6 +45,47 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
#endif #endif
return 0; return 0;
} }
static void tdSRowDemo() {
#define DEMO_N_COLS 3
int16_t schemaVersion = 0;
int32_t numOfCols = DEMO_N_COLS; // ts + int
SRowBuilder rb = {0};
SSchema schema[DEMO_N_COLS] = {
{.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON},
{.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON},
{.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}};
SSchema *pSchema = schema;
STSchema *pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols);
tdSRowInit(&rb, schemaVersion);
tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen);
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema);
void *row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough
// set row buf
tdSRowResetBuf(&rb, row);
for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) {
STColumn *pColumn = pTSChema->columns + idx;
if (idx == 0) {
int64_t tsKey = 1651234567;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx);
} else if (idx == 1) {
int32_t val1 = 10;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx);
}
}
// print
tdSRowPrint(row, pTSChema, __func__);
taosMemoryFree(pTSChema);
}
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL; void *ptr = NULL;
......
...@@ -81,7 +81,7 @@ ...@@ -81,7 +81,7 @@
./test.sh -f tsim/insert/backquote.sim -m ./test.sh -f tsim/insert/backquote.sim -m
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m ./test.sh -f tsim/query/interval-offset.sim -m
./test.sh -f tsim/tmq/basic3.sim -m #./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m ./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m ./test.sh -f tsim/mnode/basic1.sim -m
......
...@@ -29,49 +29,49 @@ ...@@ -29,49 +29,49 @@
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int64_t consumeRowCnt;
int32_t checkresult;
char topicString[1024]; int64_t consumeMsgCnt;
char keyString[1024]; int64_t consumeRowCnt;
int32_t checkresult;
int32_t numOfTopic; char topicString[1024];
char topics[32][64]; char keyString[1024];
int32_t numOfKey; int32_t numOfTopic;
char key[32][64]; char topics[32][64];
char value[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq; tmq_t* tmq;
tmq_list_t* topicList; tmq_list_t* topicList;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
// input from argvs // input from argvs
char cdbName[32]; char cdbName[32];
char dbName[32]; char dbName[32];
int32_t showMsgFlag; int32_t showMsgFlag;
int32_t showRowFlag; int32_t showRowFlag;
int32_t consumeDelay; // unit s int32_t consumeDelay; // unit s
int32_t numOfThread; int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL; TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
...@@ -93,7 +93,6 @@ static void printHelp() { ...@@ -93,7 +93,6 @@ static void printHelp() {
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void initLogFile() { void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
char file[256]; char file[256];
...@@ -106,36 +105,35 @@ void initLogFile() { ...@@ -106,36 +105,35 @@ void initLogFile() {
g_fp = pFile; g_fp = pFile;
} }
void saveConfigToLogFile() { void saveConfigToLogFile() {
time_t tTime = taosGetTimestampSec(); time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(g_fp, "###################################################################\n"); taosFprintfFile(g_fp, "###################################################################\n");
taosFprintfFile(g_fp, "# configDir: %s\n", configDir); taosFprintfFile(g_fp, "# configDir: %s\n", configDir);
taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName); taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName); taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag); taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread); taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(g_fp, " Topics: "); taosFprintfFile(g_fp, " Topics: ");
for (int j = 0 ; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
} }
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
taosFprintfFile(g_fp, " Key: "); taosFprintfFile(g_fp, " Key: ");
for (int k = 0 ; k < g_stConfInfo.stThreads[i].numOfKey; k++) { for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
} }
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
} }
taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(g_fp, "###################################################################\n"); taosFprintfFile(g_fp, "###################################################################\n");
} }
...@@ -168,7 +166,7 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -168,7 +166,7 @@ void parseArgument(int32_t argc, char* argv[]) {
} }
initLogFile(); initLogFile();
taosFprintfFile(g_fp, "====parseArgument() success\n"); taosFprintfFile(g_fp, "====parseArgument() success\n");
#if 1 #if 1
...@@ -203,26 +201,26 @@ void ltrim(char* str) { ...@@ -203,26 +201,26 @@ void ltrim(char* str) {
// return str; // return str;
} }
static int running = 1; static int running = 1;
static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) { static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
char buf[1024]; char buf[1024];
int32_t totalRows = 0; int32_t totalRows = 0;
//printf("topic: %s\n", tmq_get_topic_name(msg)); // printf("topic: %s\n", tmq_get_topic_name(msg));
//printf("vg:%d\n", tmq_get_vgroup_id(msg)); // printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable); taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
if (0 != g_stConfInfo.showRowFlag) { if (0 != g_stConfInfo.showRowFlag) {
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
} }
totalRows++; totalRows++;
} }
return totalRows; return totalRows;
...@@ -241,43 +239,43 @@ int queryDB(TAOS* taos, char* command) { ...@@ -241,43 +239,43 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) { static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("tmq_commit_cb_print() commit %d\n", resp); printf("tmq_commit_cb_print() commit %d\n", resp);
} }
void build_consumer(SThreadInfo *pInfo) { void build_consumer(SThreadInfo* pInfo) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
//tmq_conf_set(conf, "td.connect.ip", "localhost"); // tmq_conf_set(conf, "td.connect.ip", "localhost");
//tmq_conf_set(conf, "td.connect.port", "6030"); // tmq_conf_set(conf, "td.connect.port", "6030");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
// tmq_conf_set(conf, "group.id", "cgrp1"); // tmq_conf_set(conf, "group.id", "cgrp1");
for (int32_t i = 0; i < pInfo->numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
//tmq_conf_set(conf, "client.id", "c-001"); // tmq_conf_set(conf, "client.id", "c-001");
// tmq_conf_set(conf, "enable.auto.commit", "true");
// tmq_conf_set(conf, "enable.auto.commit", "false");
// tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
//tmq_conf_set(conf, "enable.auto.commit", "true"); // tmq_conf_set(conf, "auto.offset.reset", "none");
//tmq_conf_set(conf, "enable.auto.commit", "false"); // tmq_conf_set(conf, "auto.offset.reset", "earliest");
// tmq_conf_set(conf, "auto.offset.reset", "latest");
//tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
//tmq_conf_set(conf, "auto.offset.reset", "none");
//tmq_conf_set(conf, "auto.offset.reset", "earliest");
//tmq_conf_set(conf, "auto.offset.reset", "latest");
pInfo->tmq = tmq_consumer_new(conf, NULL, 0); pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
return; return;
} }
void build_topic_list(SThreadInfo *pInfo) { void build_topic_list(SThreadInfo* pInfo) {
pInfo->topicList = tmq_list_new(); pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < pInfo->numOfTopic; i++) { for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
...@@ -286,41 +284,37 @@ void build_topic_list(SThreadInfo *pInfo) { ...@@ -286,41 +284,37 @@ void build_topic_list(SThreadInfo *pInfo) {
return; return;
} }
int32_t saveConsumeResult(SThreadInfo *pInfo) { int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
g_stConfInfo.cdbName, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
return 0; return 0;
} }
void loop_consume(SThreadInfo *pInfo) { void loop_consume(SThreadInfo* pInfo) {
tmq_resp_err_t err; tmq_resp_err_t err;
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
int64_t totalRows = 0; int64_t totalRows = 0;
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId); totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
} }
...@@ -328,7 +322,7 @@ void loop_consume(SThreadInfo *pInfo) { ...@@ -328,7 +322,7 @@ void loop_consume(SThreadInfo *pInfo) {
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
totalMsgs++; totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) { if (totalMsgs >= pInfo->expectMsgCnt) {
break; break;
} }
...@@ -336,7 +330,7 @@ void loop_consume(SThreadInfo *pInfo) { ...@@ -336,7 +330,7 @@ void loop_consume(SThreadInfo *pInfo) {
break; break;
} }
} }
err = tmq_consumer_close(pInfo->tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
...@@ -346,27 +340,27 @@ void loop_consume(SThreadInfo *pInfo) { ...@@ -346,27 +340,27 @@ void loop_consume(SThreadInfo *pInfo) {
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows; pInfo->consumeRowCnt = totalRows;
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %"PRId64", consumeRowCnt: %"PRId64"\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
} }
void *consumeThreadFunc(void *param) { void* consumeThreadFunc(void* param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo *pInfo = (SThreadInfo *)param; SThreadInfo* pInfo = (SThreadInfo*)param;
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){ if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
loop_consume(pInfo); loop_consume(pInfo);
tmq_commit(pInfo->tmq, NULL, 0); tmq_commit(pInfo->tmq, NULL, 0);
...@@ -374,10 +368,10 @@ void *consumeThreadFunc(void *param) { ...@@ -374,10 +368,10 @@ void *consumeThreadFunc(void *param) {
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
// save consume result into consumeresult table // save consume result into consumeresult table
saveConsumeResult(pInfo); saveConsumeResult(pInfo);
...@@ -389,7 +383,7 @@ void parseConsumeInfo() { ...@@ -389,7 +383,7 @@ void parseConsumeInfo() {
const char delim[2] = ","; const char delim[2] = ",";
const char ch = ':'; const char ch = ':';
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim); token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -397,10 +391,10 @@ void parseConsumeInfo() { ...@@ -397,10 +391,10 @@ void parseConsumeInfo() {
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++; g_stConfInfo.stThreads[i].numOfTopic++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.stThreads[i].keyString, delim); token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -414,7 +408,7 @@ void parseConsumeInfo() { ...@@ -414,7 +408,7 @@ void parseConsumeInfo() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++; g_stConfInfo.stThreads[i].numOfKey++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
} }
...@@ -422,48 +416,49 @@ void parseConsumeInfo() { ...@@ -422,48 +416,49 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() { int32_t getConsumeInfo() {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes); int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
// ifcheckdata int
int32_t numOfThread = 0; int32_t numOfThread = 0;
while ((row = taos_fetch_row(pRes))) { while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes); int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) { if (row[i] == NULL || 0 == i) {
continue; continue;
} }
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]); g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]); g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]); g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
} }
} }
numOfThread ++; numOfThread++;
} }
g_stConfInfo.numOfThread = numOfThread; g_stConfInfo.numOfThread = numOfThread;
...@@ -474,7 +469,6 @@ int32_t getConsumeInfo() { ...@@ -474,7 +469,6 @@ int32_t getConsumeInfo() {
return 0; return 0;
} }
int main(int32_t argc, char* argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
getConsumeInfo(); getConsumeInfo();
...@@ -485,20 +479,21 @@ int main(int32_t argc, char* argv[]) { ...@@ -485,20 +479,21 @@ int main(int32_t argc, char* argv[]) {
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
// pthread_create one thread to consume // pthread_create one thread to consume
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i]))); taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
(void*)(&(g_stConfInfo.stThreads[i])));
} }
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
} }
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册