提交 c5874ca8 编写于 作者: wmmhello's avatar wmmhello

fix: add filter logic for tmq in stable wal

上级 ed6b49c7
......@@ -2073,6 +2073,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
// TDMT_VND_DROP_TABLE =================
typedef struct {
char* name;
uint64_t suid; // for tmq in wal format
int8_t igNotExists;
} SVDropTbReq;
......
......@@ -5141,6 +5141,7 @@ static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeU64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1;
tEndEncode(pCoder);
......@@ -5151,6 +5152,7 @@ static int32_t tDecodeSVDropTbReq(SDecoder *pCoder, SVDropTbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeU64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(pCoder);
......
......@@ -18,12 +18,25 @@
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
if (tEncodeI32(pEncoder, size) < 0) return -1;
void *pIter = NULL;
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
while(pIter){
int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL);
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
}
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
......@@ -32,12 +45,25 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
}else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
for(int32_t i = 0; i < size; i++){
int64_t tbUid = 0;
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
......@@ -267,14 +293,28 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
handle.execHandle.task =
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
}
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
handle.execHandle.task =
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
}
......
......@@ -15,6 +15,162 @@
#include "tq.h"
bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){
return true;
}
int16_t msgType = pHead->msgType;
char* body = pHead->body;
int32_t bodyLen = pHead->bodyLen;
int64_t tbSuid = pHandle->execHandle.execTb.suid;
int64_t realTbSuid = 0;
SDecoder coder;
void* data = POINTER_SHIFT(body, sizeof(SMsgHead));
int32_t len = bodyLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
SVCreateStbReq req = {0};
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
} else if (msgType == TDMT_VND_DROP_STB) {
SVDropStbReq req = {0};
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
} else if (msgType == TDMT_VND_CREATE_TABLE) {
SVCreateTbBatchReq req = {0};
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
goto end;
}
int32_t needRebuild = 0;
SVCreateTbReq* pCreateReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
needRebuild++;
}
}
if(needRebuild == 0){
// do nothing
}else if(needRebuild == req.nReqs){
realTbSuid = tbSuid;
}else{
realTbSuid = tbSuid;
SVCreateTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pCreateReq);
}
}
int tlen;
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
taosArrayDestroy(reqNew.pArray);
goto end;
}
SEncoder coderNew = {0};
tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
tEncoderClear(&coderNew);
memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
pHead->bodyLen = tlen + sizeof(SMsgHead);
taosMemoryFree(buf);
taosArrayDestroy(reqNew.pArray);
}
} else if (msgType == TDMT_VND_ALTER_TABLE) {
SVAlterTbReq req = {0};
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
goto end;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0);
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
metaReaderClear(&mr);
goto end;
}
realTbSuid = mr.me.ctbEntry.suid;
metaReaderClear(&mr);
} else if (msgType == TDMT_VND_DROP_TABLE) {
SVDropTbBatchReq req = {0};
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
goto end;
}
int32_t needRebuild = 0;
SVDropTbReq* pDropReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){
needRebuild++;
}
}
if(needRebuild == 0){
// do nothing
}else if(needRebuild == req.nReqs){
realTbSuid = tbSuid;
}else{
realTbSuid = tbSuid;
SVDropTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){
reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pDropReq);
}
}
int tlen;
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
taosArrayDestroy(reqNew.pArray);
goto end;
}
SEncoder coderNew = {0};
tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
tEncoderClear(&coderNew);
memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
pHead->bodyLen = tlen + sizeof(SMsgHead);
taosMemoryFree(buf);
taosArrayDestroy(reqNew.pArray);
}
} else if (msgType == TDMT_VND_DELETE) {
SDeleteRes req = {0};
if (tDecodeDeleteRes(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
} else{
ASSERT(0);
}
end:
tDecoderClear(&coder);
return tbSuid == realTbSuid;
}
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex);
......@@ -53,9 +209,11 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
goto END;
if(isValValidForTable(pHandle, pHead)){
*fetchOffset = offset;
code = 0;
goto END;
}
}
}
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
......
......@@ -6351,8 +6351,8 @@ typedef struct SVgroupDropTableBatch {
char dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) {
SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists};
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo, uint64_t suid) {
SVDropTbReq req = {.name = pClause->tableName, .suid = suid, .igNotExists = pClause->ignoreNotExists};
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (NULL == pTableBatch) {
SVgroupDropTableBatch tBatch = {0};
......@@ -6393,7 +6393,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info);
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info, pTableMeta->suid);
}
over:
......
......@@ -372,7 +372,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
ASSERT(pRead->curVersion == pHead->head.version);
// ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
......
......@@ -26,6 +26,7 @@ TdFilePtr g_fp = NULL;
typedef struct{
bool snapShot;
bool dropTable;
bool subTable;
int srcVgroups;
int dstVgroups;
char dir[64];
......@@ -74,57 +75,7 @@ static void msg_process(TAOS_RES* msg) {
taos_close(pConn);
}
int32_t init_env(Config *conf) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
char sql[128] = {0};
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", conf->dstVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", conf->srcVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
......@@ -232,7 +183,7 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
if(conf->dropTable){
if(g_conf.dropTable){
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
......@@ -297,7 +248,7 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
if(conf->dropTable){
if(g_conf.dropTable){
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
......@@ -341,7 +292,7 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
if(conf->dropTable){
if(g_conf.dropTable){
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
......@@ -358,6 +309,112 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
}
return 0;
}
int buildStable(TAOS* pConn, TAOS_RES* pRes){
pRes = taos_query(pConn, "CREATE STABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` VARCHAR(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'Beijing')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stream meters_summary_s into meters_summary as select _wstart, max(current) as current, groupid, location from meters partition by groupid, location interval(10m)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
char sql[128] = {0};
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", g_conf.dstVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists meters_summary_t1");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", g_conf.srcVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
if(g_conf.subTable){
buildStable(pConn, pRes);
}else{
buildDatabase(pConn, pRes);
}
taos_close(pConn);
return 0;
}
......@@ -377,12 +434,21 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
if(g_conf.subTable){
pRes = taos_query(pConn, "create topic meters_summary_t1 with meta as stable meters_summary");
if (taos_errno(pRes) != 0) {
printf("failed to create topic meters_summary_t1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}else{
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
......@@ -392,7 +458,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer(Config *config) {
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
......@@ -402,7 +468,7 @@ tmq_t* build_consumer(Config *config) {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.heartbeat.background", "true");
if(config->snapShot){
if(g_conf.snapShot){
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
}
......@@ -415,7 +481,11 @@ tmq_t* build_consumer(Config *config) {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
if(g_conf.subTable){
tmq_list_append(topic_list, "meters_summary_t1");
}else{
tmq_list_append(topic_list, "topic_db");
}
return topic_list;
}
......@@ -446,16 +516,16 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
void initLogFile(Config *conf) {
void initLogFile() {
char f1[256] = {0};
char f2[256] = {0};
if(conf->snapShot){
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", conf->dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", conf->dir);
if(g_conf.snapShot){
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", g_conf.dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", g_conf.dir);
}else{
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", conf->dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", conf->dir);
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", g_conf.dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", g_conf.dir);
}
TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
......@@ -471,7 +541,7 @@ void initLogFile(Config *conf) {
exit(-1);
}
if(conf->snapShot){
if(g_conf.snapShot){
char *result[] = {
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
......@@ -531,20 +601,22 @@ int main(int argc, char* argv[]) {
g_conf.srcVgroups = atol(argv[++i]);
}else if(strcmp(argv[i], "-dv") == 0){
g_conf.dstVgroups = atol(argv[++i]);
}else if(strcmp(argv[i], "-t") == 0){
g_conf.subTable = true;
}
}
printf("env init\n");
if(strlen(g_conf.dir) != 0){
initLogFile(&g_conf);
initLogFile();
}
if (init_env(&g_conf) < 0) {
if (init_env() < 0) {
return -1;
}
create_topic();
tmq_t* tmq = build_consumer(&g_conf);
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
taosCloseFile(&g_fp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册