提交 79494364 编写于 作者: L Liu Jicong

refactor(tmq): offset storage

上级 d10bfa4c
......@@ -47,7 +47,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -167,7 +167,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
......@@ -201,6 +201,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
}
......@@ -239,7 +240,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process(tmqmessage);
taos_free_result(tmqmessage);
/*tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);*/
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
......
......@@ -126,47 +126,47 @@ typedef struct setConfRet {
char retMsg[RET_MSG_LENGTH];
} setConfRet;
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT setConfRet taos_set_config(const char *config);
DLL_EXPORT int taos_init(void);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
......@@ -181,8 +181,8 @@ DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnInde
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
......@@ -192,8 +192,8 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param);
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES* res);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
// Shuduo: temporary enable for app build
#if 1
......@@ -240,11 +240,10 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
// timeout: -1 means infinitely waiting
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
......@@ -275,11 +274,6 @@ DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
#if 0
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
#endif
/* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build
......
......@@ -203,10 +203,10 @@ typedef struct {
int32_t totalRspNum;
tmq_resp_err_t rspErr;
tmq_commit_cb* userCb;
SArray* successfulOffsets;
SArray* failedOffsets;
void* userParam;
tsem_t rspSem;
/*SArray* successfulOffsets;*/
/*SArray* failedOffsets;*/
void* userParam;
tsem_t rspSem;
} SMqCommitCbParamSet;
typedef struct {
......@@ -368,11 +368,13 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
#if 0
if (code == 0) {
taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
} else {
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
}
#endif
// count down waiting rsp
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
......@@ -388,10 +390,14 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
// sem post
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
}
} else {
tsem_post(&pParamSet->rspSem);
}
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
}
return 0;
}
......@@ -417,11 +423,17 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i);
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pVg->currentOffset < 0) {
/*if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {*/
continue;
}
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pOffset->type = TMQ_OFFSET__LOG;
pOffset->version = pVg->currentOffset;
int32_t tlen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, tlen);
pOffset->subKey[tlen] = TMQ_SEPARATOR;
......@@ -454,25 +466,33 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = len,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
// TODO: put into cb
pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
}
}
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return 0;
}
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
......@@ -489,10 +509,12 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8
}
}
#if 0
if (!async) {
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif
return 0;
}
......@@ -648,7 +670,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp(tmq, true);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
......@@ -819,7 +841,7 @@ FAIL:
}
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
......@@ -1076,6 +1098,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
return set;
}
#if 0
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
bool set = false;
......@@ -1160,6 +1183,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
atomic_store_32(&tmq->epoch, epoch);
return set;
}
#endif
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
......@@ -1186,7 +1210,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
tmqUpdateEp(tmq, head->epoch, &rsp);
tmqUpdateEp2(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
} else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
......@@ -1311,10 +1335,10 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
} else {
if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
tscError("unable to poll since no committed offset but reset offset is set to none");
return NULL;
}
/*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
/*tscError("unable to poll since no committed offset but reset offset is set to none");*/
/*return NULL;*/
/*}*/
reqOffset = tmq->resetOffsetCfg;
}
......@@ -1440,7 +1464,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/
*pReset = true;
} else {
......@@ -1608,9 +1632,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
tmqCommitInner2(tmq, offsets, 0, 1, cb, param);
}
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
return tmqCommitInner2(tmq, offsets, 0, 0, NULL, NULL);
}
......@@ -149,16 +149,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t code = 0;
// get offset to fetch message
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetCommittedVer(pTq->pWal);
} else {
if (pReq->currentOffset >= 0) {
fetchOffset = pReq->currentOffset + 1;
} else {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
fetchOffset = pOffset->version + 1;
} else {
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetCommittedVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
pTq->pVnode->config.vgId, pReq->subKey);
return -1;
}
}
}
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/
......
......@@ -15,7 +15,4 @@
#include "tq.h"
int tqCommit(STQ* pTq) {
// do nothing
return 0;
}
int tqCommit(STQ* pTq) { return tqOffsetSnapshot(pTq->pOffsetStore); }
......@@ -92,6 +92,8 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
}
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
ASSERT(pOffset->version >= 0);
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}
......@@ -129,7 +131,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != bodyLen) {
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
ASSERT(0);
tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen);
taosHashCancelIterate(pStore->pHash, pIter);
......
......@@ -62,7 +62,7 @@ typedef struct {
tmq_t* tmq;
tmq_list_t* topicList;
int32_t numOfVgroups;
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
int64_t ts;
......@@ -74,7 +74,7 @@ typedef struct {
char cdbName[32];
char dbName[32];
int32_t showMsgFlag;
int32_t showRowFlag;
int32_t showRowFlag;
int32_t saveRowFlag;
int32_t consumeDelay; // unit s
int32_t numOfThread;
......@@ -108,26 +108,20 @@ static void printHelp() {
}
char* getCurrentTimeString(char* timeString) {
time_t tTime = taosGetTimestampSec();
time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL);
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
tm.tm_min, tm.tm_sec);
return timeString;
}
void initLogFile() {
char filename[256];
char tmpString[128];
char tmpString[128];
sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
#ifdef WINDOWS
for (int i = 2; i < sizeof(filename); i++) {
if (filename[i] == ':') filename[i] = '-';
......@@ -249,17 +243,18 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
for (i = 0; i < pInfo->numOfVgroups; i++) {
if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
pInfo->rowsOfPerVgroups[i][1] += rows;
return;
}
return;
}
}
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
pInfo->numOfVgroups++;
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId);
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
pInfo->numOfVgroups, vgroupId);
taosCloseFile(&g_fp);
exit(-1);
}
......@@ -277,7 +272,8 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf);
sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
pInfo->ts++, buf);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
......@@ -295,12 +291,13 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
char buf[1024];
int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg));
int32_t vgroupId = tmq_get_vgroup_id(msg);
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
//taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg));
// taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId,
// tmq_get_table_name(msg));
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
while (1) {
......@@ -316,9 +313,9 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
const char* tbName = tmq_get_table_name(msg);
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName:"null table"), totalRows, buf);
if (0 != g_stConfInfo.saveRowFlag) {
saveConsumeContentToTbl(pInfo, buf);
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
if (0 != g_stConfInfo.saveRowFlag) {
saveConsumeContentToTbl(pInfo, buf);
}
}
......@@ -326,7 +323,7 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
}
addRowsToVgroupId(pInfo, vgroupId, totalRows);
return totalRows;
}
......@@ -401,16 +398,11 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName,
now,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
......@@ -421,7 +413,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
taos_free_result(pRes);
#if 0
#if 0
// vgroups
for (i = 0; i < pInfo->numOfVgroups; i++) {
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
......@@ -445,7 +437,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
taos_free_result(pRes);
}
#endif
#endif
return 0;
}
......@@ -457,7 +449,8 @@ void loop_consume(SThreadInfo* pInfo) {
int64_t totalRows = 0;
char tmpString[128];
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
pInfo->consumerId);
pInfo->ts = taosGetTimestampMs();
......@@ -473,7 +466,7 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalRows >= pInfo->expectMsgCnt) {
char tmpString[128];
char tmpString[128];
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
break;
}
......@@ -519,6 +512,7 @@ void* consumeThreadFunc(void* param) {
pPrint("tmq_commit() manual commit when consume end.\n");
/*tmq_commit(pInfo->tmq, NULL, 0);*/
tmq_commit_sync(pInfo->tmq, NULL);
taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
}
err = tmq_unsubscribe(pInfo->tmq);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册