提交 4cc10cfa 编写于 作者: H Hongze Cheng

more

上级 3a2975f1
......@@ -20,11 +20,11 @@
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tmsg.h"
#include "tarray.h"
#include "tdataformat.h"
#include "tname.h"
#include "hash.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlist.h"
......@@ -46,6 +46,16 @@ extern "C" {
#define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
} SDataStatis;
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
void *appH;
......
......@@ -27,6 +27,7 @@ extern "C" {
#include <fcntl.h>
#include <float.h>
#include <inttypes.h>
#include <limits.h>
#include <locale.h>
#include <math.h>
#include <regex.h>
......
......@@ -3,7 +3,7 @@ add_library(tsdb STATIC ${TSDB_SRC})
target_include_directories(
tsdb
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb"
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb2"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(tsdb os util common tfs)
\ No newline at end of file
......@@ -16,6 +16,8 @@
#ifndef _TS_TSDB_FILE_H_
#define _TS_TSDB_FILE_H_
#include "os.h"
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
......@@ -32,7 +34,7 @@
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
#define TSDB_FILE_FSYNC(tf) taosFsync(TSDB_FILE_FD(tf))
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf))
#define TSDB_FILE_STATE(tf) ((tf)->state)
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
......@@ -108,7 +110,7 @@ static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) {
static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence);
int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pMFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -120,7 +122,7 @@ static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int wh
static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte);
int64_t nwrite = taosWriteFile(pMFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -162,7 +164,7 @@ static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_
static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nread = taosRead(pMFile->fd, buf, nbyte);
int64_t nread = taosReadFile(pMFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -223,7 +225,7 @@ static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeek(TSDB_FILE_FD(pDFile), offset, whence);
int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pDFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -235,7 +237,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte);
int64_t nwrite = taosWriteFile(pDFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -277,7 +279,7 @@ static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosRead(pDFile->fd, buf, nbyte);
int64_t nread = taosReadFile(pDFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -298,10 +300,10 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
typedef struct {
int fid;
int state;
int fid;
int state;
uint16_t ver; // fset version
SDFile files[TSDB_FILE_MAX];
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
typedef enum {
......@@ -347,7 +349,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
ASSERT_TSDB_FSET_NFILES_VALID(pSet);
......
......@@ -16,7 +16,10 @@
#ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
#include "os.h"
#include "tsdb.h"
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMORY_H_
#define _TD_TSDB_MEMORY_H_
static void * taosTMalloc(size_t size);
static void * taosTCalloc(size_t nmemb, size_t size);
static void * taosTRealloc(void *ptr, size_t size);
static void * taosTZfree(void *ptr);
static size_t taosTSizeof(void *ptr);
static void taosTMemset(void *ptr, int c);
static FORCE_INLINE void *taosTMalloc(size_t size) {
if (size <= 0) return NULL;
void *ret = malloc(size + sizeof(size_t));
if (ret == NULL) return NULL;
*(size_t *)ret = size;
return (void *)((char *)ret + sizeof(size_t));
}
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
size_t tsize = nmemb * size;
void * ret = taosTMalloc(tsize);
if (ret == NULL) return NULL;
taosTMemset(ret, 0);
return ret;
}
static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
if (ptr == NULL) return taosTMalloc(size);
if (size <= taosTSizeof(ptr)) return ptr;
void * tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t);
void* tptr1 = realloc(tptr, tsize);
if (tptr1 == NULL) return NULL;
tptr = tptr1;
*(size_t *)tptr = size;
return (void *)((char *)tptr + sizeof(size_t));
}
static FORCE_INLINE void* taosTZfree(void* ptr) {
if (ptr) {
free((void*)((char*)ptr - sizeof(size_t)));
}
return NULL;
}
#endif /* _TD_TSDB_MEMORY_H_ */
\ No newline at end of file
......@@ -16,15 +16,6 @@
#ifndef _TD_TSDB_INT_H_
#define _TD_TSDB_INT_H_
// // TODO: remove the include
// #include <errno.h>
// #include <fcntl.h>
// #include <limits.h>
// #include <inttypes.h>
// #include <sys/stat.h>
// #include <sys/types.h>
// #include <semaphore.h>
// #include <dirent.h>
#include "os.h"
#include "tlog.h"
......@@ -34,13 +25,13 @@
#include "tskiplist.h"
#include "tdataformat.h"
#include "tcoding.h"
#include "tscompression.h"
#include "tcompression.h"
#include "tlockfree.h"
#include "tlist.h"
#include "hash.h"
#include "thash.h"
#include "tarray.h"
#include "tfs.h"
#include "tsocket.h"
#include "tsdbMemory.h"
#include "tsdb.h"
......
......@@ -13,50 +13,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbHealth.h"
#include "os.h"
#include "taosmsg.h"
#include "tarray.h"
#include "query.h"
#include "tarray.h"
#include "tglobal.h"
#include "tlist.h"
#include "tsdbint.h"
#include "tmsg.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "ttimer.h"
#include "tsdbint.h"
#include "tthread.h"
#include "ttimer.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo) {
STsdbBufPool* pPool = pRepo->pPool;
int32_t cnt = 0;
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock* pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks ++;
cnt ++ ;
}
if (tdListAppend(pPool->bufBlockList, (void*)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks++;
cnt++;
}
}
}
return cnt;
}
return cnt;
}
// switch anther thread to run
void* cbKillQueryFree(void* param) {
STsdbRepo* pRepo = (STsdbRepo*)param;
STsdbRepo* pRepo = (STsdbRepo*)param;
// vnode
if(pRepo->appH.notifyStatus) {
if (pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if(pRepo->pthread){
// free
if (pRepo->pthread) {
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
......@@ -66,15 +65,16 @@ void* cbKillQueryFree(void* param) {
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
bool tsdbUrgeQueryFree(STsdbRepo* pRepo) {
// check previous running
if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
if (pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo),
pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if(pRepo->pthread == NULL) {
if (pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
......@@ -82,17 +82,17 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks/3;
int32_t nMaxElastic = pRepo->config.totalBlocks / 3;
STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
if (pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo),
pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
}
bool tsdbNoProblem(STsdbRepo* pRepo) {
if(listNEles(pRepo->pPool->bufBlockList) == 0)
return false;
if (listNEles(pRepo->pPool->bufBlockList) == 0) return false;
return true;
}
\ No newline at end of file
......@@ -16,7 +16,6 @@
#include "os.h"
#include "tdataformat.h"
#include "tskiplist.h"
#include "tulog.h"
#include "talgo.h"
#include "tcompare.h"
#include "exception.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册