提交 18f6fb61 编写于 作者: L Liu Jicong

enh(tmq): speed up consumer recover

上级 fc225a58
...@@ -21,17 +21,17 @@ ...@@ -21,17 +21,17 @@
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
static char dbName[64] = "tmqdb"; static char dbName[64] = "tmqdb";
static char stbName[64] = "stb"; static char stbName[64] = "stb";
static char topicName[64] = "topicname"; static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) { static int32_t msg_process(TAOS_RES* msg) {
char buf[1024]; char buf[1024];
int32_t rows = 0; int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg); const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg); const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg); int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName); printf("topic: %s\n", topicName);
printf("db: %s\n", dbName); printf("db: %s\n", dbName);
...@@ -41,14 +41,14 @@ static int32_t msg_process(TAOS_RES* msg) { ...@@ -41,14 +41,14 @@ static int32_t msg_process(TAOS_RES* msg) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg); int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg); int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg); const char* tbName = tmq_get_table_name(msg);
rows++; rows++;
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf); printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
} }
return rows; return rows;
...@@ -80,7 +80,8 @@ static int32_t init_env() { ...@@ -80,7 +80,8 @@ static int32_t init_env() {
// create super table // create super table
printf("create super table\n"); printf("create super table\n");
pRes = taos_query(pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -166,7 +167,6 @@ int32_t create_topic() { ...@@ -166,7 +167,6 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"); pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
...@@ -184,26 +184,28 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { ...@@ -184,26 +184,28 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
tmq_t* build_consumer() { tmq_t* build_consumer() {
tmq_conf_res_t code; tmq_conf_res_t code;
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true"); code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "group.id", "cgrpName"); code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.user", "root"); code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true"); code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true"); code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL; if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
return tmq; return tmq;
...@@ -211,7 +213,7 @@ tmq_t* build_consumer() { ...@@ -211,7 +213,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname"); int32_t code = tmq_list_append(topicList, "topicname");
if (code) { if (code) {
return NULL; return NULL;
} }
...@@ -228,18 +230,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) { ...@@ -228,18 +230,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t msgCnt = 0; int32_t msgCnt = 0;
int32_t consumeDelay = 5000; int32_t timeout = 5000;
while (running) { while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay); TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) { if (tmqmsg) {
msgCnt++; msgCnt++;
totalRows += msg_process(tmqmsg); totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
} else { /*} else {*/
break; /*break;*/
} }
} }
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
} }
...@@ -256,32 +258,30 @@ int main(int argc, char* argv[]) { ...@@ -256,32 +258,30 @@ int main(int argc, char* argv[]) {
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
if (NULL == tmq) { if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!\n"); fprintf(stderr, "%% build_consumer() fail!\n");
return -1; return -1;
} }
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) { if (NULL == topic_list) {
return -1; return -1;
} }
basic_consume_loop(tmq, topic_list); basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq); code = tmq_unsubscribe(tmq);
if (code) { if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code)); fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
} } else {
else {
fprintf(stderr, "%% unsubscribe\n"); fprintf(stderr, "%% unsubscribe\n");
} }
code = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (code) { if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
} } else {
else {
fprintf(stderr, "%% Consumer closed\n"); fprintf(stderr, "%% Consumer closed\n");
} }
return 0; return 0;
} }
...@@ -118,7 +118,7 @@ typedef struct { ...@@ -118,7 +118,7 @@ typedef struct {
int64_t sourceVer; int64_t sourceVer;
int64_t reqId; int64_t reqId;
SArray* blocks; // SArray<SSDataBlock*> SArray* blocks; // SArray<SSDataBlock>
} SStreamDataBlock; } SStreamDataBlock;
typedef struct { typedef struct {
......
...@@ -617,6 +617,7 @@ int32_t* taosGetErrno(); ...@@ -617,6 +617,7 @@ int32_t* taosGetErrno();
//tmq //tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -122,6 +122,7 @@ enum { ...@@ -122,6 +122,7 @@ enum {
TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__READY,
TMQ_CONSUMER_STATUS__NO_TOPIC, TMQ_CONSUMER_STATUS__NO_TOPIC,
TMQ_CONSUMER_STATUS__RECOVER,
}; };
enum { enum {
...@@ -134,10 +135,8 @@ typedef struct { ...@@ -134,10 +135,8 @@ typedef struct {
// statistics // statistics
int64_t pollCnt; int64_t pollCnt;
// offset // offset
/*int64_t committedOffset;*/ STqOffsetVal committedOffset;
/*int64_t currentOffset;*/ STqOffsetVal currentOffset;
STqOffsetVal committedOffsetNew;
STqOffsetVal currentOffsetNew;
// connection info // connection info
int32_t vgId; int32_t vgId;
int32_t vgStatus; int32_t vgStatus;
...@@ -152,7 +151,6 @@ typedef struct { ...@@ -152,7 +151,6 @@ typedef struct {
SArray* vgs; // SArray<SMqClientVg> SArray* vgs; // SArray<SMqClientVg>
int8_t isSchemaAdaptive;
SSchemaWrapper schema; SSchemaWrapper schema;
} SMqClientTopic; } SMqClientTopic;
...@@ -190,10 +188,9 @@ typedef struct { ...@@ -190,10 +188,9 @@ typedef struct {
} SMqPollCbParam; } SMqPollCbParam;
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int8_t automatic; int8_t automatic;
int8_t async; int8_t async;
/*int8_t freeOffsets;*/
int32_t waitingRspNum; int32_t waitingRspNum;
int32_t totalRspNum; int32_t totalRspNum;
int32_t rspErr; int32_t rspErr;
...@@ -418,7 +415,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -418,7 +415,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pOffset->val = pVg->currentOffsetNew; pOffset->val = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId); int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen); memcpy(pOffset->subKey, tmq->groupId, groupLen);
...@@ -462,7 +459,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -462,7 +459,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pVg->vgId, pOffset->val.version); pVg->vgId, pOffset->val.version);
// TODO: put into cb // TODO: put into cb
pVg->committedOffsetNew = pVg->currentOffsetNew; pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->requestObjRefId = 0;
...@@ -504,7 +501,6 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm ...@@ -504,7 +501,6 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
pParamSet->tmq = tmq; pParamSet->tmq = tmq;
pParamSet->automatic = 0; pParamSet->automatic = 0;
pParamSet->async = async; pParamSet->async = async;
/*pParamSet->freeOffsets = 1;*/
pParamSet->userCb = userCb; pParamSet->userCb = userCb;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0); tsem_init(&pParamSet->rspSem, 0, 0);
...@@ -518,7 +514,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm ...@@ -518,7 +514,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId != vgId) continue; if (pVg->vgId != vgId) continue;
if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) { if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
goto FAIL; goto FAIL;
} }
...@@ -550,8 +546,8 @@ FAIL: ...@@ -550,8 +546,8 @@ FAIL:
return 0; return 0;
} }
int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) { void* userParam) {
int32_t code = -1; int32_t code = -1;
if (msg != NULL) { if (msg != NULL) {
...@@ -566,7 +562,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ ...@@ -566,7 +562,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
pParamSet->tmq = tmq; pParamSet->tmq = tmq;
pParamSet->automatic = automatic; pParamSet->automatic = automatic;
pParamSet->async = async; pParamSet->async = async;
/*pParamSet->freeOffsets = 1;*/
pParamSet->userCb = userCb; pParamSet->userCb = userCb;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0); tsem_init(&pParamSet->rspSem, 0, 0);
...@@ -583,7 +578,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ ...@@ -583,7 +578,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName, tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
pVg->vgId); pVg->vgId);
if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) { if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue; continue;
} }
...@@ -699,7 +696,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { ...@@ -699,7 +696,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp(tmq, true); tmqAskEp(tmq, true);
taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer); taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else { } else {
...@@ -888,12 +885,6 @@ FAIL: ...@@ -888,12 +885,6 @@ FAIL:
return NULL; return NULL;
} }
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
}
#endif
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
...@@ -967,7 +958,11 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -967,7 +958,11 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code = param.rspErr; code = param.rspErr;
if (code != 0) goto FAIL; if (code != 0) goto FAIL;
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
goto FAIL;
}
tscDebug("consumer not ready, retry"); tscDebug("consumer not ready, retry");
taosMsleep(500); taosMsleep(500);
} }
...@@ -1006,8 +1001,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1006,8 +1001,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t epoch = pParam->epoch; int32_t epoch = pParam->epoch;
taosMemoryFree(pParam); taosMemoryFree(pParam);
if (code != 0) { if (code != 0) {
tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code); tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
if (pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
goto CREATE_MSG_FAIL;
}
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
...@@ -1083,7 +1082,7 @@ CREATE_MSG_FAIL: ...@@ -1083,7 +1082,7 @@ CREATE_MSG_FAIL:
return -1; return -1;
} }
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
bool set = false; bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
...@@ -1112,10 +1111,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -1112,10 +1111,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffsetNew); tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf); pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
} }
} }
} }
...@@ -1142,93 +1141,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -1142,93 +1141,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffsetNew = offsetNew, .currentOffset = offsetNew,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
};
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(newTopics, &topic);
}
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
taosHashCleanup(pHash);
tmq->clientTopics = newTopics;
if (taosArrayGetSize(tmq->clientTopics) == 0)
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
else
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
atomic_store_32(&tmq->epoch, epoch);
return set;
}
#if 0
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
topicNumGet);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
return false;
}
SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pHash == NULL) {
taosArrayDestroy(newTopics);
return false;
}
// find topic, build hash
for (int32_t i = 0; i < topicNumGet; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
topic.schema = pTopicEp->schema;
taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
if (vgNumCur == 0) break;
for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
}
break;
}
}
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
if (pOffset != NULL) {
offset = *pOffset;
tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
vgKey);
}
tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offset,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet, .epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE, .vgStatus = TMQ_VG_STATUS__IDLE,
...@@ -1251,7 +1164,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -1251,7 +1164,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
return set; return set;
} }
#endif
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
...@@ -1278,7 +1190,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1278,7 +1190,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/ /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/ /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
tmqUpdateEp2(tmq, head->epoch, &rsp); tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} else { } else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
...@@ -1430,7 +1342,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* ...@@ -1430,7 +1342,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
pReq->consumerId = tmq->consumerId; pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch; pReq->epoch = tmq->epoch;
/*pReq->currentOffset = reqOffset;*/ /*pReq->currentOffset = reqOffset;*/
pReq->reqOffset = pVg->currentOffsetNew; pReq->reqOffset = pVg->currentOffset;
pReq->reqId = generateRequestId(); pReq->reqId = generateRequestId();
pReq->useSnapshot = tmq->useSnapshot; pReq->useSnapshot = tmq->useSnapshot;
...@@ -1534,7 +1446,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1534,7 +1446,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("send poll\n");*/ /*printf("send poll\n");*/
char offsetFormatBuf[80]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew); tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
...@@ -1552,7 +1464,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) ...@@ -1552,7 +1464,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg); tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/ /*tmqClearUnhandleMsg(tmq);*/
*pReset = true; *pReset = true;
} else { } else {
...@@ -1586,7 +1498,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1586,7 +1498,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/ * rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset; pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) { if (pollRspWrapper->dataRsp.blockNum == 0) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -1609,8 +1521,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1609,8 +1521,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/ * rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG; pVg->currentOffset.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
...@@ -1653,6 +1565,17 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1653,6 +1565,17 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL; return NULL;
} }
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
return NULL;
}
tscDebug("consumer not ready, retry");
taosMsleep(500);
}
}
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) return NULL; if (tmqPollImpl(tmq, timeout) < 0) return NULL;
...@@ -3381,10 +3304,10 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { ...@@ -3381,10 +3304,10 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
// //
tmqCommitInner2(tmq, msg, 0, 1, cb, param); tmqCommitInner(tmq, msg, 0, 1, cb, param);
} }
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
// //
return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
} }
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#include "systable.h" #include "systable.h"
#include "taos.h" #include "taos.h"
#include "tdef.h" #include "tdef.h"
#include "types.h"
#include "tgrant.h" #include "tgrant.h"
#include "types.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
...@@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = { ...@@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, // {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, // {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)}, {TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
{TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, {TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
...@@ -284,7 +284,6 @@ static const SSysDbTableSchema consumerSchema[] = { ...@@ -284,7 +284,6 @@ static const SSysDbTableSchema consumerSchema[] = {
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
......
...@@ -131,8 +131,9 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { ...@@ -131,8 +131,9 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId, mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) { if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1; return -1;
} }
...@@ -275,6 +276,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -275,6 +276,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST_REBD) { if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer %ld", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
...@@ -305,15 +307,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -305,15 +307,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
#if 1
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
#endif
// 1. check consumer status // 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
#if 1 #if 1
if (status == MQ_CONSUMER_STATUS__LOST_REBD) { if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer %ld", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
...@@ -326,6 +327,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -326,6 +327,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#endif #endif
if (status != MQ_CONSUMER_STATUS__READY) { if (status != MQ_CONSUMER_STATUS__READY) {
mInfo("consumer %ld not ready, status: %s", consumerId, mndConsumerStatusName(status));
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1; return -1;
} }
...@@ -939,10 +941,6 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -939,10 +941,6 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend(pColInfo, numOfRows, NULL, true); colDataAppend(pColInfo, numOfRows, NULL, true);
} }
// pid
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
// end point // end point
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true); colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
......
...@@ -157,7 +157,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); ...@@ -157,7 +157,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
......
...@@ -183,7 +183,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -183,7 +183,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0; return 0;
} }
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) {
STqOffset offset = {0}; STqOffset offset = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
...@@ -302,6 +302,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -302,6 +302,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64 tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
", in vgId:%d, subkey %s, handle consumer id %" PRId64, ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId); consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
return -1; return -1;
} }
......
...@@ -204,7 +204,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -204,7 +204,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break; break;
case TDMT_VND_MQ_COMMIT_OFFSET: case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead), version) < 0) {
goto _err; goto _err;
} }
break; break;
......
...@@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -125,7 +125,7 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { ...@@ -125,7 +125,7 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
} }
} }
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem); ASSERT(elem);
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
...@@ -172,8 +172,8 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -172,8 +172,8 @@ void streamFreeQitem(SStreamQueueItem* data) {
int32_t ref = atomic_sub_fetch_32(pRef, 1); int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0); ASSERT(ref >= 0);
if (ref == 0) { if (ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i); void* dataStr = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data); taosMemoryFree(dataStr);
taosMemoryFree(pRef); taosMemoryFree(pRef);
} }
} }
......
...@@ -370,80 +370,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -370,80 +370,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return 0; return 0;
} }
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
void* buf = NULL;
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(data->blocks);
ASSERT(blockNum != 0);
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.dataSrcVgId = data->srcVgId,
.upstreamTaskId = pTask->taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) {
goto FAIL;
}
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i);
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
goto FAIL;
}
}
int32_t vgId = 0;
int32_t downstreamTaskId = 0;
// find ep
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
vgId = pTask->fixedEpDispatcher.nodeId;
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
// TODO get ctbName for each block
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
// get node
// TODO: optimize search process
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
vgId = pVgInfo->vgId;
downstreamTaskId = pVgInfo->taskId;
*ppEpSet = &pVgInfo->epSet;
break;
}
}
}
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
req.taskId = downstreamTaskId;
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId);
streamDispatchOneReq(pTask, &req, vgId, *ppEpSet);
code = 0;
FAIL:
if (code < 0 && buf) rpcFreeCont(buf);
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
if (req.dataLen) taosArrayDestroy(req.dataLen);
return code;
}
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
...@@ -461,7 +387,7 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -461,7 +387,7 @@ int32_t streamDispatch(SStreamTask* pTask) {
} }
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qDebug("stream continue dispatching: task %d", pTask->taskId); qDebug("stream dispatching: task %d", pTask->taskId);
int32_t code = 0; int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) { if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
......
...@@ -82,17 +82,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -82,17 +82,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
return 0; return 0;
} }
#if 0
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t childId = pBlock->childId;
int64_t ver = pBlock->sourceVer;
SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
/*pChildInfo-> = ver;*/
return 0;
}
#endif
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) { int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
...@@ -150,10 +139,11 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) ...@@ -150,10 +139,11 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0; return 0;
} }
// TODO: handle version // TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t cnt = 1; int32_t batchCnt = 1;
void* data = NULL; void* data = NULL;
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
...@@ -169,13 +159,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -169,13 +159,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
} else { } else {
void* newRet; void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
break; break;
} else { } else {
cnt++; batchCnt++;
data = newRet; data = newRet;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
} }
} }
...@@ -198,16 +187,14 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -198,16 +187,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) { if (qRes == NULL) {
// TODO log failed ver taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
streamFreeQitem(data); streamFreeQitem(data);
return -1; return -1;
} }
...@@ -218,17 +205,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -218,17 +205,18 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
} }
if (streamTaskOutput(pTask, qRes) < 0) { if (streamTaskOutput(pTask, qRes) < 0) {
// TODO save failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
streamFreeQitem(data); streamFreeQitem(data);
taosFreeQitem(qRes);
return -1; return -1;
} }
/*streamQueueProcessSuccess(pTask->inputQueue);*/
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
} }
......
...@@ -421,6 +421,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -421,6 +421,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
wDebug("vgId:%d, wal write log %ld, msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType));
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
// TODO ftruncate // TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
......
...@@ -76,8 +76,8 @@ int32_t tsem_wait(tsem_t* sem) { ...@@ -76,8 +76,8 @@ int32_t tsem_wait(tsem_t* sem) {
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
struct timespec ts, rel; struct timespec ts, rel;
FILETIME ft_before, ft_after; FILETIME ft_before, ft_after;
int rc; int rc;
rel.tv_sec = 0; rel.tv_sec = 0;
rel.tv_nsec = nanosecs; rel.tv_nsec = nanosecs;
...@@ -217,7 +217,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -217,7 +217,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// int e = errno; // int e = errno;
// if (e == EEXIST) continue; // if (e == EEXIST) continue;
// if (e == EINTR) continue; // if (e == EINTR) continue;
// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, // fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem,
// e, strerror(e)); // e, strerror(e));
// abort(); // abort();
// } while (p->sem == SEM_FAILED); // } while (p->sem == SEM_FAILED);
...@@ -231,7 +232,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -231,7 +232,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// } // }
// kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); // kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
// if (ret != KERN_SUCCESS) { // if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__,
// sem); // sem);
// // we fail-fast here, because we have less-doc about semaphore_create for the moment // // we fail-fast here, because we have less-doc about semaphore_create for the moment
// abort(); // abort();
...@@ -258,8 +260,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -258,8 +260,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// } // }
// struct tsem_s *p = *sem; // struct tsem_s *p = *sem;
// if (!p->valid) { // if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// abort(); // sem); abort();
// } // }
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) { // if (taosThreadMutexLock(&p->lock)) {
...@@ -270,7 +272,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -270,7 +272,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// p->val -= 1; // p->val -= 1;
// if (p->val < 0) { // if (p->val < 0) {
// if (taosThreadCondWait(&p->cond, &p->lock)) { // if (taosThreadCondWait(&p->cond, &p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__,
// sem); // sem);
// abort(); // abort();
// } // }
...@@ -297,8 +300,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -297,8 +300,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// } // }
// struct tsem_s *p = *sem; // struct tsem_s *p = *sem;
// if (!p->valid) { // if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// abort(); // sem); abort();
// } // }
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) { // if (taosThreadMutexLock(&p->lock)) {
...@@ -309,7 +312,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -309,7 +312,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// p->val += 1; // p->val += 1;
// if (p->val <= 0) { // if (p->val <= 0) {
// if (taosThreadCondSignal(&p->cond)) { // if (taosThreadCondSignal(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__,
// sem); // sem);
// abort(); // abort();
// } // }
...@@ -332,7 +336,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -332,7 +336,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// int tsem_destroy(tsem_t *sem) { // int tsem_destroy(tsem_t *sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (!*sem) { // if (!*sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// // abort(); // // abort();
// return 0; // return 0;
// } // }
...@@ -370,7 +375,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -370,7 +375,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// int r = sem_unlink(name); // int r = sem_unlink(name);
// if (r) { // if (r) {
// int e = errno; // int e = errno;
// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, // fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem,
// e, strerror(e)); // e, strerror(e));
// abort(); // abort();
// } // }
...@@ -385,225 +391,189 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { ...@@ -385,225 +391,189 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// *sem = NULL; // *sem = NULL;
// return 0; // return 0;
// } // }
typedef struct typedef struct {
{ pthread_mutex_t count_lock;
pthread_mutex_t count_lock; pthread_cond_t count_bump;
pthread_cond_t count_bump; unsigned int count;
unsigned int count; } bosal_sem_t;
}bosal_sem_t;
int tsem_init(tsem_t *psem, int flags, unsigned int count) {
int tsem_init(tsem_t *psem, int flags, unsigned int count) bosal_sem_t *pnewsem;
{ int result;
bosal_sem_t *pnewsem;
int result; pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t));
if (!pnewsem) {
pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t)); return -1;
if (! pnewsem) }
{ result = pthread_mutex_init(&pnewsem->count_lock, NULL);
return -1; if (result) {
} free(pnewsem);
result = pthread_mutex_init(&pnewsem->count_lock, NULL); return result;
if (result) }
{ result = pthread_cond_init(&pnewsem->count_bump, NULL);
free(pnewsem); if (result) {
return result; pthread_mutex_destroy(&pnewsem->count_lock);
} free(pnewsem);
result = pthread_cond_init(&pnewsem->count_bump, NULL); return result;
if (result) }
{ pnewsem->count = count;
pthread_mutex_destroy(&pnewsem->count_lock); *psem = (tsem_t)pnewsem;
free(pnewsem); return 0;
return result;
}
pnewsem->count = count;
*psem = (tsem_t)pnewsem;
return 0;
} }
int tsem_destroy(tsem_t *psem) int tsem_destroy(tsem_t *psem) {
{ bosal_sem_t *poldsem;
bosal_sem_t *poldsem;
if (! psem) if (!psem) {
{ return EINVAL;
return EINVAL; }
} poldsem = (bosal_sem_t *)*psem;
poldsem = (bosal_sem_t *)*psem;
pthread_mutex_destroy(&poldsem->count_lock); pthread_mutex_destroy(&poldsem->count_lock);
pthread_cond_destroy(&poldsem->count_bump); pthread_cond_destroy(&poldsem->count_bump);
free(poldsem); free(poldsem);
return 0; return 0;
} }
int tsem_post(tsem_t *psem) int tsem_post(tsem_t *psem) {
{ bosal_sem_t *pxsem;
bosal_sem_t *pxsem; int result, xresult;
int result, xresult;
if (! psem) if (!psem) {
{ return EINVAL;
return EINVAL; }
} pxsem = (bosal_sem_t *)*psem;
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock); result = pthread_mutex_lock(&pxsem->count_lock);
if (result) if (result) {
{ return result;
return result; }
} pxsem->count = pxsem->count + 1;
pxsem->count = pxsem->count + 1;
xresult = pthread_cond_signal(&pxsem->count_bump); xresult = pthread_cond_signal(&pxsem->count_bump);
result = pthread_mutex_unlock(&pxsem->count_lock); result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) if (result) {
{ return result;
return result; }
} if (xresult) {
if (xresult) errno = xresult;
{ return -1;
errno = xresult; }
return -1; return 0;
}
return 0;
} }
int tsem_trywait(tsem_t *psem) int tsem_trywait(tsem_t *psem) {
{ bosal_sem_t *pxsem;
bosal_sem_t *pxsem; int result, xresult;
int result, xresult;
if (! psem) if (!psem) {
{ return EINVAL;
return EINVAL; }
} pxsem = (bosal_sem_t *)*psem;
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock); result = pthread_mutex_lock(&pxsem->count_lock);
if (result) if (result) {
{ return result;
return result; }
} xresult = 0;
xresult = 0;
if (pxsem->count > 0) if (pxsem->count > 0) {
{ pxsem->count--;
pxsem->count--; } else {
} xresult = EAGAIN;
else }
{ result = pthread_mutex_unlock(&pxsem->count_lock);
xresult = EAGAIN; if (result) {
} return result;
result = pthread_mutex_unlock(&pxsem->count_lock); }
if (result) if (xresult) {
{ errno = xresult;
return result; return -1;
} }
if (xresult) return 0;
{
errno = xresult;
return -1;
}
return 0;
} }
int tsem_wait(tsem_t *psem) int tsem_wait(tsem_t *psem) {
{ bosal_sem_t *pxsem;
bosal_sem_t *pxsem; int result, xresult;
int result, xresult;
if (! psem) if (!psem) {
{ return EINVAL;
return EINVAL; }
} pxsem = (bosal_sem_t *)*psem;
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock); result = pthread_mutex_lock(&pxsem->count_lock);
if (result) if (result) {
{ return result;
return result; }
} xresult = 0;
xresult = 0;
if (pxsem->count == 0) if (pxsem->count == 0) {
{ xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock);
xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock); }
} if (!xresult) {
if (! xresult) if (pxsem->count > 0) {
{ pxsem->count--;
if (pxsem->count > 0)
{
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
} }
return 0; }
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
} }
int tsem_timewait(tsem_t *psem, int64_t nanosecs) int tsem_timewait(tsem_t *psem, int64_t nanosecs) {
{
struct timespec abstim = { struct timespec abstim = {
.tv_sec = 0, .tv_sec = 0,
.tv_nsec = nanosecs, .tv_nsec = nanosecs,
}; };
bosal_sem_t *pxsem; bosal_sem_t *pxsem;
int result, xresult; int result, xresult;
if (! psem) if (!psem) {
{ return EINVAL;
return EINVAL; }
} pxsem = (bosal_sem_t *)*psem;
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock); result = pthread_mutex_lock(&pxsem->count_lock);
if (result) if (result) {
{ return result;
return result; }
} xresult = 0;
xresult = 0;
if (pxsem->count == 0) if (pxsem->count == 0) {
{ xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim);
xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim); }
} if (!xresult) {
if (! xresult) if (pxsem->count > 0) {
{ pxsem->count--;
if (pxsem->count > 0)
{
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
} }
return 0; }
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
} }
bool taosCheckPthreadValid(TdThread thread) { bool taosCheckPthreadValid(TdThread thread) {
int32_t ret = taosThreadKill(thread, 0); int32_t ret = taosThreadKill(thread, 0);
if (ret == ESRCH) return false; if (ret == ESRCH) return false;
if (ret == EINVAL) return false; if (ret == EINVAL) return false;
// alive // alive
return true; return true;
} }
int64_t taosGetSelfPthreadId() { int64_t taosGetSelfPthreadId() {
TdThread thread = taosThreadSelf(); TdThread thread = taosThreadSelf();
...@@ -650,7 +620,13 @@ int64_t taosGetSelfPthreadId() { ...@@ -650,7 +620,13 @@ int64_t taosGetSelfPthreadId() {
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; } int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
void taosResetPthread(TdThread* thread) { *thread = 0; } void taosResetPthread(TdThread* thread) { *thread = 0; }
bool taosComparePthread(TdThread first, TdThread second) { return first == second; } bool taosComparePthread(TdThread first, TdThread second) { return first == second; }
int32_t taosGetPId() { return getpid(); }
int32_t taosGetPId() {
static __thread int32_t pid = 0;
if (pid != 0) return pid;
pid = getpid();
return pid;
}
int32_t taosGetAppName(char* name, int32_t* len) { int32_t taosGetAppName(char* name, int32_t* len) {
const char* self = "/proc/self/exe"; const char* self = "/proc/self/exe";
......
...@@ -621,6 +621,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file" ...@@ -621,6 +621,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file"
//tmq //tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册