diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index ea5b574ceda600b7322cc97f75b3947f25dc3fe5..d74584f844f055dc8f6f7e2101711c4b25b72d28 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -15,6 +15,7 @@ #include "tsdb.h" #include "tsdbFSet2.h" +#include "tsdbMerge.h" #include "tsdbReadUtil.h" #include "tsdbSttFileRW.h" @@ -352,10 +353,14 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray return TSDB_CODE_SUCCESS; } -static int32_t uidComparFn(const void *p1, const void *p2) { - const uint64_t *uid1 = p1; +static int32_t suidComparFn(const void *target, const void *p2) { + const uint64_t *targetUid = target; const uint64_t *uid2 = p2; - return (*uid1) - (*uid2); + if (*uid2 == (*targetUid)) { + return 0; + } else { + return (*targetUid) < (*uid2) ? -1:1; + } } static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid, @@ -372,29 +377,55 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 } } - // for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) { - // SStatisBlk *p = &pStatisBlkArray->data[i]; - // if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) { - // break; - // } - // - // if (p->maxTbid.uid < uid) { - // break; - // } - // } - if (i >= TARRAY2_SIZE(pStatisBlkArray)) { return false; } - SStatisBlk *p = &pStatisBlkArray->data[i]; - STbStatisBlock block = {0}; - tsdbSttFileReadStatisBlock(pReader, p, &block); + while(i < TARRAY2_SIZE(pStatisBlkArray)) { + SStatisBlk *p = &pStatisBlkArray->data[i]; + if (p->minTbid.suid > suid) { + return false; + } + + STbStatisBlock block = {0}; + tsdbSttFileReadStatisBlock(pReader, p, &block); + + int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ); + if (index == -1) { + tStatisBlockDestroy(&block); + return false; + } + + int32_t j = index; + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else if (block.uid->data[j] > uid) { + while (j >= 0 && block.suid->data[j] == suid) { + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else { + j -= 1; + } + } + } else { + j = index + 1; + while (j < block.suid->size && block.suid->data[j] == suid) { + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else { + j += 1; + } + } + } - int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ); - tStatisBlockDestroy(&block); + tStatisBlockDestroy(&block); + i += 1; + } - return (index != -1); + return false; } int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, @@ -445,12 +476,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr); } - // bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader); - // if (!exists) { - // pIter->iSttBlk = -1; - // pIter->pSttBlk = NULL; - // return TSDB_CODE_SUCCESS; - // } + bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader); + if (!exists) { + pIter->iSttBlk = -1; + pIter->pSttBlk = NULL; + return TSDB_CODE_SUCCESS; + } // find the start block, actually we could load the position to avoid repeatly searching for the start position when // the skey is updated. diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 3af9d2a07a4e1b31cd22d6de82b814b50167e13c..a4d5715083a2b6d82f048b8950c2a74c8df11f6d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -100,7 +100,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const .type = fobj->f->type, .did = did[0], .fid = fobj->f->fid, - .cid = rtner->cid, + .cid = fobj->f->cid, .size = fobj->f->size, .stt[0] = { diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 39ba92fdc5e93d45facd85a2015461ad1fd68d8c..4c4e22bdd92846631026a74064f4f4d6a0a14a4f 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -17,6 +17,15 @@ #include #include "os.h" +#ifdef WINDOWS +#define THREAD_PTR_CHECK(p) \ + do { \ + if (!(p) || !(*(p))) return 0; \ + } while (0); +#else +#define THREAD_PTR_CHECK(p) +#endif + int32_t taosThreadCreate(TdThread *tid, const TdThreadAttr *attr, void *(*start)(void *), void *arg) { return pthread_create(tid, attr, start, arg); } @@ -83,9 +92,13 @@ int32_t taosThreadCondSignal(TdThreadCond *cond) { return pthread_cond_signal(co int32_t taosThreadCondBroadcast(TdThreadCond *cond) { return pthread_cond_broadcast(cond); } -int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { return pthread_cond_wait(cond, mutex); } +int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_cond_wait(cond, mutex); +} int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const struct timespec *abstime) { + THREAD_PTR_CHECK(mutex) return pthread_cond_timedwait(cond, mutex, abstime); } @@ -124,24 +137,37 @@ int32_t taosThreadKeyDelete(TdThreadKey key) { return pthread_key_delete(key); } int32_t taosThreadKill(TdThread thread, int32_t sig) { return pthread_kill(thread, sig); } // int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) { +// THREAD_PTR_CHECK(mutex) // return pthread_mutex_consistent(mutex); // } -int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { return pthread_mutex_destroy(mutex); } +int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_destroy(mutex); +} int32_t taosThreadMutexInit(TdThreadMutex *mutex, const TdThreadMutexAttr *attr) { return pthread_mutex_init(mutex, attr); } -int32_t taosThreadMutexLock(TdThreadMutex *mutex) { return pthread_mutex_lock(mutex); } +int32_t taosThreadMutexLock(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_lock(mutex); +} // int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) { // return pthread_mutex_timedlock(mutex, abstime); // } -int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { return pthread_mutex_trylock(mutex); } +int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_trylock(mutex); +} -int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { return pthread_mutex_unlock(mutex); } +int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_unlock(mutex); +} int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) { return pthread_mutexattr_destroy(attr); } @@ -224,6 +250,7 @@ int32_t taosThreadSetSchedParam(TdThread thread, int32_t policy, const struct sc int32_t taosThreadSetSpecific(TdThreadKey key, const void *value) { return pthread_setspecific(key, value); } int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_destroy((pthread_mutex_t *)lock); #else @@ -242,6 +269,7 @@ int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) { } int32_t taosThreadSpinLock(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_lock((pthread_mutex_t *)lock); #else @@ -250,6 +278,7 @@ int32_t taosThreadSpinLock(TdThreadSpinlock *lock) { } int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_trylock((pthread_mutex_t *)lock); #else @@ -258,6 +287,7 @@ int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { } int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_unlock((pthread_mutex_t *)lock); #else