未验证 提交 51aa9413 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15937 from taosdata/feature/stream

enh(tmq): speed up consumer recover
......@@ -21,17 +21,17 @@
#include "taos.h"
static int running = 1;
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
......@@ -41,14 +41,14 @@ static int32_t msg_process(TAOS_RES* msg) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
}
return rows;
......@@ -80,7 +80,8 @@ static int32_t init_env() {
// create super table
printf("create super table\n");
pRes = taos_query(pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -166,7 +167,6 @@ int32_t create_topic() {
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
......@@ -184,26 +184,28 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
tmq_t* build_consumer() {
tmq_conf_res_t code;
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
......@@ -211,7 +213,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname");
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
return NULL;
}
......@@ -228,18 +230,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t consumeDelay = 5000;
int32_t timeout = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay);
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
} else {
break;
}
/*} else {*/
/*break;*/
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
......@@ -256,32 +258,30 @@ int main(int argc, char* argv[]) {
tmq_t* tmq = build_consumer();
if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!\n");
fprintf(stderr, "%% build_consumer() fail!\n");
return -1;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
return -1;
}
}
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
}
else {
} else {
fprintf(stderr, "%% unsubscribe\n");
}
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
}
else {
} else {
fprintf(stderr, "%% Consumer closed\n");
}
return 0;
}
......@@ -118,7 +118,7 @@ typedef struct {
int64_t sourceVer;
int64_t reqId;
SArray* blocks; // SArray<SSDataBlock*>
SArray* blocks; // SArray<SSDataBlock>
} SStreamDataBlock;
typedef struct {
......
......@@ -619,6 +619,7 @@ int32_t* taosGetErrno();
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#ifdef __cplusplus
}
......
......@@ -122,6 +122,7 @@ enum {
TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY,
TMQ_CONSUMER_STATUS__NO_TOPIC,
TMQ_CONSUMER_STATUS__RECOVER,
};
enum {
......@@ -134,10 +135,8 @@ typedef struct {
// statistics
int64_t pollCnt;
// offset
/*int64_t committedOffset;*/
/*int64_t currentOffset;*/
STqOffsetVal committedOffsetNew;
STqOffsetVal currentOffsetNew;
STqOffsetVal committedOffset;
STqOffsetVal currentOffset;
// connection info
int32_t vgId;
int32_t vgStatus;
......@@ -152,7 +151,6 @@ typedef struct {
SArray* vgs; // SArray<SMqClientVg>
int8_t isSchemaAdaptive;
SSchemaWrapper schema;
} SMqClientTopic;
......@@ -190,10 +188,9 @@ typedef struct {
} SMqPollCbParam;
typedef struct {
tmq_t* tmq;
int8_t automatic;
int8_t async;
/*int8_t freeOffsets;*/
tmq_t* tmq;
int8_t automatic;
int8_t async;
int32_t waitingRspNum;
int32_t totalRspNum;
int32_t rspErr;
......@@ -418,7 +415,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pOffset->val = pVg->currentOffsetNew;
pOffset->val = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen);
......@@ -462,7 +459,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pVg->vgId, pOffset->val.version);
// TODO: put into cb
pVg->committedOffsetNew = pVg->currentOffsetNew;
pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
......@@ -504,7 +501,6 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
pParamSet->tmq = tmq;
pParamSet->automatic = 0;
pParamSet->async = async;
/*pParamSet->freeOffsets = 1;*/
pParamSet->userCb = userCb;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
......@@ -518,7 +514,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId != vgId) continue;
if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
goto FAIL;
}
......@@ -550,8 +546,8 @@ FAIL:
return 0;
}
int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
int32_t code = -1;
if (msg != NULL) {
......@@ -566,7 +562,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
pParamSet->tmq = tmq;
pParamSet->automatic = automatic;
pParamSet->async = async;
/*pParamSet->freeOffsets = 1;*/
pParamSet->userCb = userCb;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
......@@ -583,7 +578,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
pVg->vgId);
if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId,
pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue;
}
......@@ -699,7 +696,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp(tmq, true);
taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
......@@ -888,12 +885,6 @@ FAIL:
return NULL;
}
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
}
#endif
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
......@@ -967,7 +958,11 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code = param.rspErr;
if (code != 0) goto FAIL;
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
goto FAIL;
}
tscDebug("consumer not ready, retry");
taosMsleep(500);
}
......@@ -1006,8 +1001,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t epoch = pParam->epoch;
taosMemoryFree(pParam);
if (code != 0) {
tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
if (pMsg->pData) taosMemoryFreeClear(pMsg->pData);
tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
goto CREATE_MSG_FAIL;
}
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
......@@ -1083,7 +1082,7 @@ CREATE_MSG_FAIL:
return -1;
}
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
......@@ -1112,10 +1111,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
}
}
}
......@@ -1142,93 +1141,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffsetNew = offsetNew,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
};
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(newTopics, &topic);
}
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
taosHashCleanup(pHash);
tmq->clientTopics = newTopics;
if (taosArrayGetSize(tmq->clientTopics) == 0)
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
else
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
atomic_store_32(&tmq->epoch, epoch);
return set;
}
#if 0
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
topicNumGet);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
return false;
}
SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pHash == NULL) {
taosArrayDestroy(newTopics);
return false;
}
// find topic, build hash
for (int32_t i = 0; i < topicNumGet; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
topic.schema = pTopicEp->schema;
taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur);
if (vgNumCur == 0) break;
for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
}
break;
}
}
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
if (pOffset != NULL) {
offset = *pOffset;
tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
vgKey);
}
tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offset,
.currentOffset = offsetNew,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
......@@ -1251,7 +1164,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
atomic_store_32(&tmq->epoch, epoch);
return set;
}
#endif
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
......@@ -1278,7 +1190,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
tmqUpdateEp2(tmq, head->epoch, &rsp);
tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
} else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
......@@ -1430,7 +1342,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;
/*pReq->currentOffset = reqOffset;*/
pReq->reqOffset = pVg->currentOffsetNew;
pReq->reqOffset = pVg->currentOffset;
pReq->reqId = generateRequestId();
pReq->useSnapshot = tmq->useSnapshot;
......@@ -1534,7 +1446,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("send poll\n");*/
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
......@@ -1552,7 +1464,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/
*pReset = true;
} else {
......@@ -1586,7 +1498,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) {
taosFreeQitem(pollRspWrapper);
......@@ -1609,8 +1521,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffset.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
......@@ -1653,6 +1565,17 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL;
}
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
return NULL;
}
tscDebug("consumer not ready, retry");
taosMsleep(500);
}
}
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) return NULL;
......@@ -3384,10 +3307,10 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
//
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
tmqCommitInner(tmq, msg, 0, 1, cb, param);
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
//
return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL);
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
}
......@@ -284,8 +284,7 @@ static const SSysDbTableSchema consumerSchema[] = {
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},*/
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
......
......@@ -131,8 +131,9 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1;
}
......@@ -275,6 +276,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer %ld", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId;
......@@ -305,15 +307,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
#if 1
atomic_store_32(&pConsumer->hbStatus, 0);
#endif
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
#if 1
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer %ld", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId;
......@@ -326,6 +327,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#endif
if (status != MQ_CONSUMER_STATUS__READY) {
mInfo("consumer %ld not ready, status: %s", consumerId, mndConsumerStatusName(status));
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1;
}
......@@ -939,13 +941,9 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend(pColInfo, numOfRows, NULL, true);
}
// pid
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
// end point
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
/*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
/*colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
// up time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
......@@ -157,7 +157,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
......
......@@ -183,7 +183,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) {
STqOffset offset = {0};
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
......@@ -302,6 +302,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
", in vgId:%d, subkey %s, handle consumer id %" PRId64,
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
return -1;
}
......
......@@ -204,7 +204,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break;
case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
pMsg->contLen - sizeof(SMsgHead), version) < 0) {
goto _err;
}
break;
......
......@@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus
......
......@@ -124,7 +124,7 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
}
}
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem);
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
......@@ -171,8 +171,8 @@ void streamFreeQitem(SStreamQueueItem* data) {
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data);
void* dataStr = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(dataStr);
taosMemoryFree(pRef);
}
}
......
......@@ -370,80 +370,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return 0;
}
int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
void* buf = NULL;
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(data->blocks);
ASSERT(blockNum != 0);
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.dataSrcVgId = data->srcVgId,
.upstreamTaskId = pTask->taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) {
goto FAIL;
}
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i);
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
goto FAIL;
}
}
int32_t vgId = 0;
int32_t downstreamTaskId = 0;
// find ep
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
vgId = pTask->fixedEpDispatcher.nodeId;
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
// TODO get ctbName for each block
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
// get node
// TODO: optimize search process
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
vgId = pVgInfo->vgId;
downstreamTaskId = pVgInfo->taskId;
*ppEpSet = &pVgInfo->epSet;
break;
}
}
}
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
req.taskId = downstreamTaskId;
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId);
streamDispatchOneReq(pTask, &req, vgId, *ppEpSet);
code = 0;
FAIL:
if (code < 0 && buf) rpcFreeCont(buf);
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
if (req.dataLen) taosArrayDestroy(req.dataLen);
return code;
}
int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
......@@ -461,7 +387,7 @@ int32_t streamDispatch(SStreamTask* pTask) {
}
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qDebug("stream continue dispatching: task %d", pTask->taskId);
qDebug("stream dispatching: task %d", pTask->taskId);
int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
......
......@@ -82,17 +82,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
return 0;
}
#if 0
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t childId = pBlock->childId;
int64_t ver = pBlock->sourceVer;
SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
/*pChildInfo-> = ver;*/
return 0;
}
#endif
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
......@@ -150,10 +139,11 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0;
}
// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t cnt = 1;
int32_t batchCnt = 1;
void* data = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
......@@ -169,13 +159,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
} else {
void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
batchCnt++;
data = newRet;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue);
}
}
......@@ -198,16 +187,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(data);
return -1;
}
......@@ -218,17 +205,18 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
}
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO save failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
streamFreeQitem(data);
taosFreeQitem(qRes);
return -1;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
} else {
taosArrayDestroy(pRes);
}
......
......@@ -421,6 +421,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
wDebug("vgId:%d, wal write log %ld, msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType));
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
// TODO ftruncate
terrno = TAOS_SYSTEM_ERROR(errno);
......
......@@ -16,8 +16,8 @@
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#include "tdef.h"
#include "pthread.h"
#include "tdef.h"
#ifdef WINDOWS
......@@ -77,8 +77,8 @@ int32_t tsem_wait(tsem_t* sem) {
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
struct timespec ts, rel;
FILETIME ft_before, ft_after;
int rc;
FILETIME ft_before, ft_after;
int rc;
rel.tv_sec = 0;
rel.tv_nsec = nanosecs;
......@@ -218,7 +218,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// int e = errno;
// if (e == EEXIST) continue;
// if (e == EINTR) continue;
// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem,
// e, strerror(e));
// abort();
// } while (p->sem == SEM_FAILED);
......@@ -232,7 +233,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// }
// kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
// if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__,
// sem);
// // we fail-fast here, because we have less-doc about semaphore_create for the moment
// abort();
......@@ -259,8 +261,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); abort();
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
......@@ -271,7 +273,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// p->val -= 1;
// if (p->val < 0) {
// if (taosThreadCondWait(&p->cond, &p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__,
// sem);
// abort();
// }
......@@ -298,8 +301,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); abort();
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
......@@ -310,7 +313,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// p->val += 1;
// if (p->val <= 0) {
// if (taosThreadCondSignal(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__,
// sem);
// abort();
// }
......@@ -333,7 +337,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// int tsem_destroy(tsem_t *sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (!*sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// // abort();
// return 0;
// }
......@@ -371,7 +376,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// int r = sem_unlink(name);
// if (r) {
// int e = errno;
// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem,
// e, strerror(e));
// abort();
// }
......@@ -386,225 +392,189 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// *sem = NULL;
// return 0;
// }
typedef struct
{
pthread_mutex_t count_lock;
pthread_cond_t count_bump;
unsigned int count;
}bosal_sem_t;
int tsem_init(tsem_t *psem, int flags, unsigned int count)
{
bosal_sem_t *pnewsem;
int result;
pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t));
if (! pnewsem)
{
return -1;
}
result = pthread_mutex_init(&pnewsem->count_lock, NULL);
if (result)
{
free(pnewsem);
return result;
}
result = pthread_cond_init(&pnewsem->count_bump, NULL);
if (result)
{
pthread_mutex_destroy(&pnewsem->count_lock);
free(pnewsem);
return result;
}
pnewsem->count = count;
*psem = (tsem_t)pnewsem;
return 0;
typedef struct {
pthread_mutex_t count_lock;
pthread_cond_t count_bump;
unsigned int count;
} bosal_sem_t;
int tsem_init(tsem_t *psem, int flags, unsigned int count) {
bosal_sem_t *pnewsem;
int result;
pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t));
if (!pnewsem) {
return -1;
}
result = pthread_mutex_init(&pnewsem->count_lock, NULL);
if (result) {
free(pnewsem);
return result;
}
result = pthread_cond_init(&pnewsem->count_bump, NULL);
if (result) {
pthread_mutex_destroy(&pnewsem->count_lock);
free(pnewsem);
return result;
}
pnewsem->count = count;
*psem = (tsem_t)pnewsem;
return 0;
}
int tsem_destroy(tsem_t *psem)
{
bosal_sem_t *poldsem;
int tsem_destroy(tsem_t *psem) {
bosal_sem_t *poldsem;
if (! psem)
{
return EINVAL;
}
poldsem = (bosal_sem_t *)*psem;
if (!psem) {
return EINVAL;
}
poldsem = (bosal_sem_t *)*psem;
pthread_mutex_destroy(&poldsem->count_lock);
pthread_cond_destroy(&poldsem->count_bump);
free(poldsem);
return 0;
pthread_mutex_destroy(&poldsem->count_lock);
pthread_cond_destroy(&poldsem->count_bump);
free(poldsem);
return 0;
}
int tsem_post(tsem_t *psem)
{
bosal_sem_t *pxsem;
int result, xresult;
int tsem_post(tsem_t *psem) {
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
if (!psem) {
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
pxsem->count = pxsem->count + 1;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result) {
return result;
}
pxsem->count = pxsem->count + 1;
xresult = pthread_cond_signal(&pxsem->count_bump);
xresult = pthread_cond_signal(&pxsem->count_bump);
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
}
return 0;
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
}
int tsem_trywait(tsem_t *psem)
{
bosal_sem_t *pxsem;
int result, xresult;
int tsem_trywait(tsem_t *psem) {
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
if (!psem) {
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
xresult = 0;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result) {
return result;
}
xresult = 0;
if (pxsem->count > 0)
{
pxsem->count--;
}
else
{
xresult = EAGAIN;
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
}
return 0;
if (pxsem->count > 0) {
pxsem->count--;
} else {
xresult = EAGAIN;
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
}
int tsem_wait(tsem_t *psem)
{
bosal_sem_t *pxsem;
int result, xresult;
int tsem_wait(tsem_t *psem) {
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
if (!psem) {
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
xresult = 0;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result) {
return result;
}
xresult = 0;
if (pxsem->count == 0)
{
xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock);
}
if (! xresult)
{
if (pxsem->count > 0)
{
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
if (pxsem->count == 0) {
xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock);
}
if (!xresult) {
if (pxsem->count > 0) {
pxsem->count--;
}
return 0;
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
}
int tsem_timewait(tsem_t *psem, int64_t nanosecs)
{
int tsem_timewait(tsem_t *psem, int64_t nanosecs) {
struct timespec abstim = {
.tv_sec = 0,
.tv_nsec = nanosecs,
};
bosal_sem_t *pxsem;
int result, xresult;
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
if (!psem) {
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
xresult = 0;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result) {
return result;
}
xresult = 0;
if (pxsem->count == 0)
{
xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim);
}
if (! xresult)
{
if (pxsem->count > 0)
{
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
if (pxsem->count == 0) {
xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim);
}
if (!xresult) {
if (pxsem->count > 0) {
pxsem->count--;
}
return 0;
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
}
bool taosCheckPthreadValid(TdThread thread) {
bool taosCheckPthreadValid(TdThread thread) {
int32_t ret = taosThreadKill(thread, 0);
if (ret == ESRCH) return false;
if (ret == EINVAL) return false;
// alive
return true;
}
}
int64_t taosGetSelfPthreadId() {
TdThread thread = taosThreadSelf();
......@@ -651,7 +621,13 @@ int64_t taosGetSelfPthreadId() {
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
void taosResetPthread(TdThread* thread) { *thread = 0; }
bool taosComparePthread(TdThread first, TdThread second) { return first == second; }
int32_t taosGetPId() { return getpid(); }
int32_t taosGetPId() {
static int32_t pid;
if (pid != 0) return pid;
pid = getpid();
return pid;
}
int32_t taosGetAppName(char* name, int32_t* len) {
const char* self = "/proc/self/exe";
......
......@@ -623,6 +623,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file"
//tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
#ifdef TAOS_ERROR_C
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册