未验证 提交 c65dc5d3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18454 from taosdata/feature/stream

fix(tmq): time wait
......@@ -29,7 +29,7 @@ typedef dispatch_semaphore_t tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t nanosecs);
int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
......@@ -38,7 +38,7 @@ int tsem_destroy(tsem_t *sem);
#define tsem_t sem_t
#define tsem_init sem_init
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t nanosecs);
int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
......
......@@ -25,6 +25,13 @@
#include "tref.h"
#include "ttimer.h"
#if 0
#undef tsem_post
#define tsem_post(x) \
tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
sem_post(x)
#endif
int32_t tmqAskEp(tmq_t* tmq, bool async);
typedef struct {
......@@ -733,12 +740,12 @@ void tmqSendHbReq(void* param, void* tmrId) {
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqHbReq failed");
return;
}
void *pReq = taosMemoryCalloc(1, tlen);
void* pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) {
tscError("failed to malloc MqHbReq msg, size:%d", tlen);
return;
......@@ -1397,12 +1404,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
req.epoch = tmq->epoch;
strcpy(req.cgroup, tmq->groupId);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqAskEpReq failed");
return -1;
}
void *pReq = taosMemoryCalloc(1, tlen);
void* pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) {
tscError("failed to malloc askEpReq msg, size:%d", tlen);
return -1;
......@@ -1461,7 +1468,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
return code;
}
void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
......@@ -1561,20 +1568,20 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tsem_post(&tmq->rspSem);
return -1;
}
char *msg = taosMemoryCalloc(1, msgSize);
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
}
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
taosMemoryFree(msg);
......@@ -1797,17 +1804,20 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL;
}
if (timeout != -1) {
int64_t endTime = taosGetTimestampMs();
int64_t leftTime = endTime - startTime;
if (leftTime > timeout) {
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", end time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, endTime);
int64_t currentTime = taosGetTimestampMs();
int64_t passedTime = currentTime - startTime;
if (passedTime > timeout) {
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
tsem_timewait(&tmq->rspSem, leftTime * 1000);
/*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*", left time %" PRId64,*/
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - passedTime));
} else {
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait(&tmq->rspSem, 500 * 1000);
tsem_timewait(&tmq->rspSem, 1000);
}
}
}
......
......@@ -75,17 +75,18 @@ int32_t tsem_wait(tsem_t* sem) {
return ret;
}
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
struct timespec ts, rel;
FILETIME ft_before, ft_after;
int rc;
rel.tv_sec = 0;
rel.tv_nsec = nanosecs;
GetSystemTimeAsFileTime(&ft_before);
int32_t tsem_timewait(tsem_t* sem, int64_t milis) {
return tsem_wait(sem);
#if 0
struct timespec ts;
timespec_get(&ts);
ts.tv_nsec += ms * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
/*GetSystemTimeAsFileTime(&ft_before);*/
// errno = 0;
rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel));
rc = sem_timedwait(sem, ts);
/* This should have timed out */
// assert(errno == ETIMEDOUT);
......@@ -102,6 +103,7 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// return 1;
// }
return rc;
#endif
}
#elif defined(_TD_DARWIN_64)
......@@ -133,9 +135,9 @@ int tsem_wait(tsem_t *psem) {
return 0;
}
int tsem_timewait(tsem_t *psem, int64_t nanosecs) {
int tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_semaphore_wait(*psem, nanosecs);
dispatch_semaphore_wait(*psem, milis * 1000 * 1000);
return 0;
}
......@@ -227,15 +229,20 @@ int32_t tsem_wait(tsem_t* sem) {
return ret;
}
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int ret = 0;
struct timespec tv = {
.tv_sec = 0,
.tv_nsec = nanosecs,
};
struct timespec ts = {0};
if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
return -1;
}
ts.tv_nsec += ms * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue;
while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册