diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index cd64e48347f8fb99cd70000042fa010b325831a5..7365d7b1519b9bd022c73a80b5216d26b4050626 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -25,6 +25,7 @@ #include "tsdb.h" #include "tskiplist.h" #include "tutil.h" +#include "trwlatch.h" #ifdef __cplusplus extern "C" { @@ -59,6 +60,7 @@ typedef struct STable { TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure char* sql; void* cqhandle; + SRWLatch latch; // TODO: implementa latch functions T_REF_DECLARE(); } STable; diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 16f06c82bbf310eb94ab389d6c8d028d8d4b160c..2084203814a72e0abb68b7d14823a7651b043221 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -120,7 +120,9 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { STsdbBufPool *pBufPool = pRepo->pPool; while (POOL_IS_EMPTY(pBufPool)) { + pRepo->repoLocked = false; pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); + pRepo->repoLocked = true; } SListNode * pNode = tdListPopHead(pBufPool->bufBlockList); diff --git a/src/util/inc/trwlatch.h b/src/util/inc/trwlatch.h new file mode 100644 index 0000000000000000000000000000000000000000..c6923f0e9028bb2ee062a6dd9314b75ef952d81a --- /dev/null +++ b/src/util/inc/trwlatch.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef __TD_RWLATCH_H__ +#define __TD_RWLATCH_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef int32_t SRWLatch; + +void taosInitRWLatch(SRWLatch *pLatch); +void taosWLockLatch(SRWLatch *pLatch); +void taosWUnLockLatch(SRWLatch *pLatch); +void taosRLockLatch(SRWLatch *pLatch); +void taosRUnLockLatch(SRWLatch *pLatch); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/util/src/trwlatch.c b/src/util/src/trwlatch.c new file mode 100644 index 0000000000000000000000000000000000000000..cc027aa3df4f14064ba668271a872a2f967ded39 --- /dev/null +++ b/src/util/src/trwlatch.c @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +// #define _GNU_SOURCE +// #include + +#include "trwlatch.h" +#include "os.h" + +#define TD_RWLATCH_WRITE_FLAG 0x40000000 + +void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; } + +void taosWLockLatch(SRWLatch *pLatch) { + SRWLatch oLatch, nLatch; + int nLoops = 0; + + // Set write flag + while (1) { + oLatch = atomic_load_32(pLatch); + if (oLatch & TD_RWLATCH_WRITE_FLAG) { + nLoops++; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + continue; + } + + nLatch = oLatch | TD_RWLATCH_WRITE_FLAG; + if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break; + } + + // wait for all reads end + nLoops = 0; + while (1) { + oLatch = atomic_load_32(pLatch); + if (oLatch == TD_RWLATCH_WRITE_FLAG) break; + nLoops++; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } +} + +void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); } + +void taosRLockLatch(SRWLatch *pLatch) { + SRWLatch oLatch, nLatch; + int nLoops = 0; + + while (1) { + oLatch = atomic_load_32(pLatch); + if (oLatch & TD_RWLATCH_WRITE_FLAG) { + nLoops++; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + continue; + } + + nLatch = oLatch + 1; + if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break; + } +} + +void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); } \ No newline at end of file