提交 d138f77a 编写于 作者: Z zhihaop

feat: use sleepMutex to sleep the background thread

上级 e7fa3b0a
...@@ -35,12 +35,15 @@ typedef struct SAsyncBulkWriteDispatcher { ...@@ -35,12 +35,15 @@ typedef struct SAsyncBulkWriteDispatcher {
SArray* buffer; SArray* buffer;
// the mutex to protect the dispatcher. // the mutex to protect the dispatcher.
pthread_mutex_t mutex; pthread_mutex_t bufferMutex;
// the mutex to sleep the background thread.
pthread_mutex_t sleepMutex;
// the cond to signal to background thread. // the cond to signal to background thread.
pthread_cond_t timeout; pthread_cond_t timeout;
// the cond to signal to background thread. // the cond to signal when buffer not full.
pthread_cond_t notFull; pthread_cond_t notFull;
// the background thread to manage batching timeout. // the background thread to manage batching timeout.
......
...@@ -183,10 +183,10 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -183,10 +183,10 @@ inline static SArray* dispatcherPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
* @return the items in the dispatcher, SArray<SSqlObj*>. * @return the items in the dispatcher, SArray<SSqlObj*>.
*/ */
inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatcher) { inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatcher) {
pthread_mutex_lock(&dispatcher->mutex); pthread_mutex_lock(&dispatcher->bufferMutex);
SArray* statements = dispatcherPollAll(dispatcher); SArray* statements = dispatcherPollAll(dispatcher);
pthread_cond_broadcast(&dispatcher->notFull); pthread_cond_broadcast(&dispatcher->notFull);
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->bufferMutex);
return statements; return statements;
} }
...@@ -198,19 +198,19 @@ inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatche ...@@ -198,19 +198,19 @@ inline static SArray* dispatcherLockPollAll(SAsyncBulkWriteDispatcher* dispatche
* @return return whether offer success. * @return return whether offer success.
*/ */
inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) { inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSql) {
pthread_mutex_lock(&dispatcher->mutex); pthread_mutex_lock(&dispatcher->bufferMutex);
// if dispatcher is shutdown, must fail back to normal insertion. // if dispatcher is shutdown, must fail back to normal insertion.
// usually not happen, unless taos_query_a(...) after taos_close(...). // usually not happen, unless taos_query_a(...) after taos_close(...).
if (atomic_load_8(&dispatcher->shutdown)) { if (atomic_load_8(&dispatcher->shutdown)) {
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->bufferMutex);
return false; return false;
} }
// the buffer is full. // the buffer is full.
while (dispatcher->currentSize >= dispatcher->batchSize) { while (dispatcher->currentSize >= dispatcher->batchSize) {
if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->mutex)) { if (pthread_cond_wait(&dispatcher->notFull, &dispatcher->bufferMutex)) {
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->bufferMutex);
return false; return false;
} }
} }
...@@ -226,7 +226,7 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq ...@@ -226,7 +226,7 @@ inline static bool dispatcherTryOffer(SAsyncBulkWriteDispatcher* dispatcher, SSq
taosArrayDestroy(&statements); taosArrayDestroy(&statements);
pthread_cond_broadcast(&dispatcher->notFull); pthread_cond_broadcast(&dispatcher->notFull);
} }
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->bufferMutex);
return true; return true;
} }
...@@ -264,23 +264,10 @@ _error: ...@@ -264,23 +264,10 @@ _error:
* @param millis the duration in milliseconds. * @param millis the duration in milliseconds.
* @return the timespec after `millis` ms. * @return the timespec after `millis` ms.
*/ */
static inline struct timespec afterMillis(struct timespec t, int32_t millis) { static inline void afterMillis(struct timespec *t, int32_t millis) {
t.tv_nsec += millis * 1000000L; t->tv_nsec += millis * 1000000L;
t.tv_sec += t.tv_nsec / 1000000000L; t->tv_sec += t->tv_nsec / 1000000000L;
t.tv_nsec %= 1000000000L; t->tv_nsec %= 1000000000L;
return t;
}
/**
* Get the duration in milliseconds from timespec s to timespec t.
* @param s the start timespec.
* @param t the end timespec.
* @return the duration in milliseconds.
*/
static inline int64_t durationMillis(struct timespec s, struct timespec t) {
int64_t d = (t.tv_sec - s.tv_sec) * 1000;
d += (t.tv_nsec - s.tv_nsec) / 1000000L;
return d;
} }
/** /**
...@@ -289,26 +276,19 @@ static inline int64_t durationMillis(struct timespec s, struct timespec t) { ...@@ -289,26 +276,19 @@ static inline int64_t durationMillis(struct timespec s, struct timespec t) {
* @param dispatcher the dispatcher thread to sleep. * @param dispatcher the dispatcher thread to sleep.
* @param timeout the timeout in CLOCK_REALTIME. * @param timeout the timeout in CLOCK_REALTIME.
*/ */
inline static void dispatcherSleepUntil(SAsyncBulkWriteDispatcher* dispatcher, struct timespec timeout) { inline static void dispatcherSleepUntil(SAsyncBulkWriteDispatcher* dispatcher, struct timespec* timeout) {
struct timespec current; pthread_mutex_lock(&dispatcher->sleepMutex);
clock_gettime(CLOCK_REALTIME, &current);
// if current > timeout, no sleep required.
if (durationMillis(current, timeout) <= 0) {
return;
}
pthread_mutex_lock(&dispatcher->mutex);
while (true) { while (true) {
// notified by dispatcherShutdown(...). // notified by dispatcherShutdown(...).
if (atomic_load_8(&dispatcher->shutdown)) { if (atomic_load_8(&dispatcher->shutdown)) {
break; break;
} }
if (pthread_cond_timedwait(&dispatcher->timeout, &dispatcher->mutex, &timeout)) { if (pthread_cond_timedwait(&dispatcher->timeout, &dispatcher->sleepMutex, timeout)) {
fflush(stdout);
break; break;
} }
} }
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->sleepMutex);
} }
/** /**
...@@ -319,9 +299,9 @@ static void* dispatcherTimeoutCallback(void* arg) { ...@@ -319,9 +299,9 @@ static void* dispatcherTimeoutCallback(void* arg) {
setThreadName("tscAsyncBackground"); setThreadName("tscAsyncBackground");
while (!atomic_load_8(&dispatcher->shutdown)) { while (!atomic_load_8(&dispatcher->shutdown)) {
struct timespec current; struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &current); clock_gettime(CLOCK_REALTIME, &timeout);
struct timespec timeout = afterMillis(current, dispatcher->timeoutMs); afterMillis(&timeout, dispatcher->timeoutMs);
SArray* statements = dispatcherLockPollAll(dispatcher); SArray* statements = dispatcherLockPollAll(dispatcher);
dispatcherExecute(statements); dispatcherExecute(statements);
...@@ -329,7 +309,7 @@ static void* dispatcherTimeoutCallback(void* arg) { ...@@ -329,7 +309,7 @@ static void* dispatcherTimeoutCallback(void* arg) {
// Similar to scheduleAtFixedRate in Java, if the execution time exceed // Similar to scheduleAtFixedRate in Java, if the execution time exceed
// `timeoutMs` milliseconds, then there will be no sleep. // `timeoutMs` milliseconds, then there will be no sleep.
dispatcherSleepUntil(dispatcher, timeout); dispatcherSleepUntil(dispatcher, &timeout);
} }
return NULL; return NULL;
} }
...@@ -354,13 +334,15 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int ...@@ -354,13 +334,15 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
} }
// init the mutex and the cond. // init the mutex and the cond.
pthread_mutex_init(&dispatcher->mutex, NULL); pthread_mutex_init(&dispatcher->bufferMutex, NULL);
pthread_mutex_init(&dispatcher->sleepMutex, NULL);
pthread_cond_init(&dispatcher->timeout, NULL); pthread_cond_init(&dispatcher->timeout, NULL);
pthread_cond_init(&dispatcher->notFull, NULL); pthread_cond_init(&dispatcher->notFull, NULL);
// init background thread. // init background thread.
if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) { if (pthread_create(&dispatcher->background, NULL, dispatcherTimeoutCallback, dispatcher)) {
pthread_mutex_destroy(&dispatcher->mutex); pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_mutex_destroy(&dispatcher->sleepMutex);
pthread_cond_destroy(&dispatcher->timeout); pthread_cond_destroy(&dispatcher->timeout);
pthread_cond_destroy(&dispatcher->notFull); pthread_cond_destroy(&dispatcher->notFull);
taosArrayDestroy(&dispatcher->buffer); taosArrayDestroy(&dispatcher->buffer);
...@@ -378,10 +360,10 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int ...@@ -378,10 +360,10 @@ SAsyncBulkWriteDispatcher* createAsyncBulkWriteDispatcher(int32_t batchSize, int
*/ */
inline static void dispatcherShutdown(SAsyncBulkWriteDispatcher* dispatcher) { inline static void dispatcherShutdown(SAsyncBulkWriteDispatcher* dispatcher) {
// mark shutdown, signal shutdown to timeout thread. // mark shutdown, signal shutdown to timeout thread.
pthread_mutex_lock(&dispatcher->mutex); pthread_mutex_lock(&dispatcher->sleepMutex);
atomic_store_8(&dispatcher->shutdown, true); atomic_store_8(&dispatcher->shutdown, true);
pthread_cond_broadcast(&dispatcher->timeout); pthread_cond_broadcast(&dispatcher->timeout);
pthread_mutex_unlock(&dispatcher->mutex); pthread_mutex_unlock(&dispatcher->sleepMutex);
// make sure the timeout thread exit. // make sure the timeout thread exit.
pthread_join(dispatcher->background, NULL); pthread_join(dispatcher->background, NULL);
...@@ -409,7 +391,8 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) { ...@@ -409,7 +391,8 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher) {
taosArrayDestroy(&dispatcher->buffer); taosArrayDestroy(&dispatcher->buffer);
// destroy the mutex. // destroy the mutex.
pthread_mutex_destroy(&dispatcher->mutex); pthread_mutex_destroy(&dispatcher->bufferMutex);
pthread_mutex_destroy(&dispatcher->sleepMutex);
pthread_cond_destroy(&dispatcher->timeout); pthread_cond_destroy(&dispatcher->timeout);
pthread_cond_destroy(&dispatcher->notFull); pthread_cond_destroy(&dispatcher->notFull);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册