提交 8d15d79f 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

add_executable(simulate_vnode "simulate_vnode.c")
target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a)
\ No newline at end of file
target_link_libraries(simulate_vnode craft lz4 uv_a)
\ No newline at end of file
......@@ -3,4 +3,4 @@ target_sources(singleNode
PRIVATE
"singleNode.c"
)
target_link_libraries(singleNode PUBLIC traft lz4 uv_a)
target_link_libraries(singleNode traft lz4 uv_a)
......@@ -2,11 +2,7 @@ aux_source_directory(src TMQ_DEMO_SRC)
add_executable(tmq ${TMQ_DEMO_SRC})
target_link_libraries(
tmq
PUBLIC taos
#PUBLIC util
#PUBLIC common
#PUBLIC os
tmq taos
)
target_include_directories(
tmq
......
......@@ -51,6 +51,10 @@ typedef enum {
} ECheckItemType;
typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig;
typedef enum {
TSDB_STATIS_OK = 0, // statis part exist and load successfully
TSDB_STATIS_NONE = 1, // statis part not exist
} ETsdbStatisStatus;
extern char *qtypeStr[];
......
......@@ -52,6 +52,13 @@ extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth;
extern int64_t tsTickPerDay[3];
// monitor
extern bool tsEnableMonitor;
extern int32_t tsMonitorInterval;
extern char tsMonitorFqdn[];
extern uint16_t tsMonitorPort;
extern int32_t tsMonitorMaxLogs;
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
......
......@@ -739,6 +739,9 @@ typedef struct {
int32_t maxRows;
int32_t commitTime;
int32_t fsyncPeriod;
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
int8_t walLevel;
int8_t precision;
int8_t compression;
......@@ -749,6 +752,7 @@ typedef struct {
int8_t selfIndex;
int8_t streamMode;
SReplica replicas[TSDB_MAX_REPLICA];
} SCreateVnodeReq, SAlterVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......
......@@ -52,6 +52,8 @@ void deltaToUtcInitOnce();
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
void taosFormatUtcTime(char *buf, int32_t bufLen, int64_t time, int32_t precision);
#ifdef __cplusplus
}
#endif
......
......@@ -16,6 +16,8 @@
#ifndef _TD_MND_H_
#define _TD_MND_H_
#include "monitor.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -30,20 +32,6 @@ typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct SMnodeLoad {
int64_t numOfDnode;
int64_t numOfMnode;
int64_t numOfVgroup;
int64_t numOfDatabase;
int64_t numOfSuperTable;
int64_t numOfChildTable;
int64_t numOfNormalTable;
int64_t numOfColumn;
int64_t totalPoints;
int64_t totalStorage;
int64_t compStorage;
} SMnodeLoad;
typedef struct {
int32_t dnodeId;
int64_t clusterId;
......@@ -92,13 +80,16 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption);
void mndDestroy(const char *path);
/**
* @brief Get mnode statistics info.
* @brief Get mnode monitor info.
*
* @param pMnode The mnode object.
* @param pLoad Statistics of the mnode.
* @param pClusterInfo
* @param pVgroupInfo
* @param pGrantInfo
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
/**
* @brief Get user authentication info.
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MONITOR_H_
#define _TD_MONITOR_H_
#include "tarray.h"
#include "tdef.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int32_t dnode_id;
char dnode_ep[TSDB_EP_LEN];
} SMonBasicInfo;
typedef struct {
int32_t dnode_id;
char dnode_ep[TSDB_EP_LEN];
char status[8];
} SMonDnodeDesc;
typedef struct {
int32_t mnode_id;
char mnode_ep[TSDB_EP_LEN];
char role[8];
} SMonMnodeDesc;
typedef struct {
char first_ep[TSDB_EP_LEN];
int32_t first_ep_dnode_id;
char version[12];
float master_uptime; // day
int32_t monitor_interval; // sec
int32_t vgroups_total;
int32_t vgroups_alive;
int32_t vnodes_total;
int32_t vnodes_alive;
int32_t connections_total;
SArray *dnodes; // array of SMonDnodeDesc
SArray *mnodes; // array of SMonMnodeDesc
} SMonClusterInfo;
typedef struct {
int32_t dnode_id;
int8_t vnode_online;
char vnode_role[8];
} SMonVnodeDesc;
typedef struct {
int32_t vgroup_id;
SMonVnodeDesc vnodes[TSDB_MAX_REPLICA];
} SMonVgroupDesc;
typedef struct {
char database_name[TSDB_DB_NAME_LEN];
int32_t tables_num;
int8_t status;
SArray *vgroups; // array of SMonVgroupDesc
} SMonVgroupInfo;
typedef struct {
int32_t expire_time;
int32_t timeseries_used;
int32_t timeseries_total;
} SMonGrantInfo;
typedef struct {
float uptime; // day
float cpu_engine;
float cpu_system;
float cpu_cores;
float mem_engine; // MB
float mem_system; // MB
float mem_total; // MB
float disk_engine; // GB
float disk_used; // GB
float disk_total; // GB
float net_in; // Kb/s
float net_out; // Kb/s
float io_read; // Mb/s
float io_write; // Mb/s
float io_read_disk; // Mb/s
float io_write_disk; // Mb/s
int32_t req_select;
float req_select_rate;
int32_t req_insert;
int32_t req_insert_success;
float req_insert_rate;
int32_t req_insert_batch;
int32_t req_insert_batch_success;
float req_insert_batch_rate;
int32_t errors;
int32_t vnodes_num;
int32_t masters;
int32_t has_mnode;
} SMonDnodeInfo;
typedef struct {
char name[TSDB_FILENAME_LEN];
int32_t level;
SDiskSize size;
} SMonDiskDesc;
typedef struct {
SArray *disks; // array of SMonDiskDesc
} SMonDiskInfo;
typedef struct {
int64_t ts;
int8_t level;
char content[1024];
} SMonLogItem;
typedef struct SMonInfo SMonInfo;
typedef struct {
const char *server;
uint16_t port;
int32_t maxLogs;
} SMonCfg;
int32_t monInit(const SMonCfg *pCfg);
void monCleanup();
void monAddLogItem(SMonLogItem *pItem);
SMonInfo *monCreateMonitorInfo();
void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo);
void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo);
void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo);
void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo);
void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo);
void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo);
void monSendReport(SMonInfo *pMonitor);
void monCleanupMonitorInfo(SMonInfo *pMonitor);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MONITOR_H_*/
......@@ -23,8 +23,6 @@ extern "C" {
#include "catalog.h"
#include "planner.h"
struct SSchJob;
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
} SSchedulerCfg;
......@@ -72,7 +70,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -80,7 +78,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob);
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, int64_t *pJob);
/**
* Fetch query result from the remote query executor
......@@ -88,7 +86,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa
* @param data
* @return
*/
int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
int32_t schedulerFetchRows(int64_t job, void **data);
/**
......@@ -102,7 +100,7 @@ int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
* Free the query job
* @param pJob
*/
void schedulerFreeJob(void *pJob);
void schedulerFreeJob(int64_t job);
void schedulerDestroy(void);
......
......@@ -42,7 +42,6 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage);
bool taosGetTotalSysMemoryKB(uint64_t *kb);
bool taosGetProcMemory(float *memoryUsedMB); //
bool taosGetSysMemory(float *memoryUsedMB); //
void taosGetDisk();
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
bool taosReadProcIO(int64_t *rchars, int64_t *wchars);
bool taosGetProcIO(float *readKB, float *writeKB);
......
......@@ -171,7 +171,7 @@ typedef struct SRequestSendRecvBody {
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
struct SSchJob* pQueryJob; // query job, created according to sql query DAG.
int64_t queryJob; // query job, created according to sql query DAG.
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
......
......@@ -227,10 +227,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
pRequest->code = code;
......@@ -240,8 +240,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
}
......@@ -494,7 +494,7 @@ void* doFetchRow(SRequestObj* pRequest) {
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void**)&pResInfo->pData);
int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
return NULL;
......
aux_source_directory(src COMMON_SRC)
add_library(common ${COMMON_SRC})
add_library(common STATIC ${COMMON_SRC})
target_include_directories(
common
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
......
......@@ -46,6 +46,13 @@ int32_t tsMaxBinaryDisplayWidth = 30;
bool tsEnableSlaveQuery = 1;
bool tsPrintAuth = 0;
// monitor
bool tsEnableMonitor = 1;
int32_t tsMonitorInterval = 5;
char tsMonitorFqdn[TSDB_FQDN_LEN] = {0};
uint16_t tsMonitorPort = 6043;
int32_t tsMonitorMaxLogs = 100;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
......@@ -314,6 +321,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1;
return 0;
}
......@@ -345,7 +359,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
}
static void taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_EP_LEN);
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
......@@ -425,6 +439,12 @@ static void taosSetServerCfg(SConfig *pCfg) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN);
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32;
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}
......
......@@ -2112,6 +2112,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1;
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
if (tEncodeU32(&encoder, pReq->hashBegin) < 0) return -1;
if (tEncodeU32(&encoder, pReq->hashEnd) < 0) return -1;
if (tEncodeI8(&encoder, pReq->hashMethod) < 0) return -1;
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
......@@ -2152,6 +2155,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->hashBegin) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->hashEnd) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->hashMethod) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
......
......@@ -22,6 +22,7 @@ bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN;
}
#if 0
// TODO refactor
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0 || src == NULL) {
......@@ -46,7 +47,7 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil
return pFilter;
}
#endif
#if 0
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
......
......@@ -627,3 +627,50 @@ const char* fmtts(int64_t ts) {
return buf;
}
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) {
char ts[40] = {0};
struct tm* ptm;
int32_t fractionLen;
char* format = NULL;
time_t quot = 0;
long mod = 0;
switch (precision) {
case TSDB_TIME_PRECISION_MILLI: {
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
mod = t % 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
quot = t / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
mod = t % 1000000;
break;
}
case TSDB_TIME_PRECISION_NANO: {
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
mod = t % 1000000000;
break;
}
default:
fractionLen = 0;
assert(false);
}
ptm = localtime(&quot);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
length += snprintf(ts + length, fractionLen, format, mod);
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
tstrncpy(buf, ts, bufLen);
}
\ No newline at end of file
aux_source_directory(src BNODE_SRC)
add_library(bnode ${BNODE_SRC})
add_library(bnode STATIC ${BNODE_SRC})
target_include_directories(
bnode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/bnode"
......
......@@ -6,4 +6,4 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(taosd dnode util os)
target_link_libraries(taosd dnode)
aux_source_directory(src DNODE_SRC)
add_library(dnode STATIC ${DNODE_SRC})
target_link_libraries(
dnode
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
PUBLIC qnode
PUBLIC snode
PUBLIC bnode
PUBLIC wal
PUBLIC sync
PUBLIC taos
PUBLIC tfs
dnode cjson mnode vnode qnode snode bnode wal sync taos tfs monitor
)
target_include_directories(
dnode
......
......@@ -36,6 +36,7 @@ extern "C" {
#include "ttime.h"
#include "tworker.h"
#include "tglobal.h"
#include "monitor.h"
#include "dnode.h"
......
......@@ -32,6 +32,9 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -21,6 +21,7 @@
#include "dndSnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "monitor.h"
#include "sync.h"
#include "tfs.h"
#include "wal.h"
......@@ -140,7 +141,7 @@ static int32_t dndInitDir(SDnode *pDnode, SDnodeObjCfg *pCfg) {
return 0;
}
static void dndCloseImp(SDnode *pDnode) {
static void dndCloseDir(SDnode *pDnode) {
tfree(pDnode->dir.mnode);
tfree(pDnode->dir.vnodes);
tfree(pDnode->dir.dnode);
......@@ -260,7 +261,7 @@ void dndClose(SDnode *pDnode) {
dndCleanupMgmt(pDnode);
tfsClose(pDnode->pTfs);
dndCloseImp(pDnode);
dndCloseDir(pDnode);
free(pDnode);
dInfo("dnode object is closed, data:%p", pDnode);
}
......@@ -289,11 +290,7 @@ int32_t dndInit() {
}
SVnodeOpt vnodeOpt = {
.sver = tsVersion,
.nthreads = tsNumOfCommitThreads,
.putReqToVQueryQFp = dndPutReqToVQueryQ,
.sendReqToDnodeFp = dndSendReqToDnode
};
.nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = dndSendReqToDnode};
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr());
......@@ -301,6 +298,13 @@ int32_t dndInit() {
return -1;
}
SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn};
if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr());
dndCleanup();
return -1;
}
dInfo("dnode env is initialized");
return 0;
}
......@@ -314,19 +318,8 @@ void dndCleanup() {
walCleanUp();
vnodeCleanup();
rpcCleanup();
monCleanup();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
#if 0
const double unit = 1024 * 1024 * 1024;
SDiskSize diskSize = tfsGetSize(pTfs);
tfsUpdateSize(&fsMeta);
#endif
}
\ No newline at end of file
......@@ -22,6 +22,7 @@
#include "dndTransport.h"
#include "dndVnodes.h"
#include "dndWorker.h"
#include "monitor.h"
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
......@@ -473,19 +474,62 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
rpcSendResponse(&rpcRsp);
}
void dndGetBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->dnode_id = dndGetDnodeId(pDnode);
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
}
static void dndSendMonitorReport(SDnode *pDnode) {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0) return;
SMonInfo *pMonitor = monCreateMonitorInfo();
if (pMonitor == NULL) return;
dTrace("pDnode:%p, send monitor report to %s:%u", pDnode, tsMonitorFqdn, tsMonitorPort);
SMonBasicInfo basicInfo = {0};
dndGetBasicInfo(pDnode, &basicInfo);
monSetBasicInfo(pMonitor, &basicInfo);
SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0};
if (dndGetMnodeMonitorInfo(pDnode, &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
}
monSendReport(pMonitor);
monCleanupMonitorInfo(pMonitor);
}
static void *dnodeThreadRoutine(void *param) {
SDnode *pDnode = param;
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int32_t ms = tsStatusInterval * 1000;
int64_t lastStatusTime = taosGetTimestampMs();
int64_t lastMonitorTime = lastStatusTime;
setThreadName("dnode-hb");
while (true) {
pthread_testcancel();
taosMsleep(ms);
taosMsleep(200);
if (dndGetStat(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) {
continue;
}
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) {
int64_t curTime = taosGetTimestampMs();
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
dndSendStatusReq(pDnode);
lastStatusTime = curTime;
}
float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
if (monitorInterval >= tsMonitorInterval) {
dndSendMonitorReport(pDnode);
lastMonitorTime = curTime;
}
}
}
......
......@@ -630,3 +630,13 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc
dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt);
return code;
}
int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) return -1;
int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
dndReleaseMnode(pDnode, pMnode);
return code;
}
\ No newline at end of file
......@@ -523,6 +523,9 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->walCfg.rollPeriod = 128;
pCfg->walCfg.segSize = 128;
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
}
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
......
aux_source_directory(src SUT_SRC)
add_library(sut STATIC ${SUT_SRC})
add_library(sut STATIC STATIC ${SUT_SRC})
target_link_libraries(
sut
PUBLIC dnode
......
aux_source_directory(src MNODE_SRC)
add_library(mnode ${MNODE_SRC})
add_library(mnode STATIC ${MNODE_SRC})
target_include_directories(
mnode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mnode"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
mnode
PRIVATE scheduler
PRIVATE sdb
PRIVATE wal
PRIVATE transport
PRIVATE cjson
PRIVATE sync
mnode scheduler sdb wal transport cjson sync monitor
)
if(${BUILD_TEST})
......
......@@ -21,11 +21,11 @@
#include "sdb.h"
#include "tcache.h"
#include "tep.h"
#include "tglobal.h"
#include "tqueue.h"
#include "ttime.h"
#include "wal.h"
#include "version.h"
#include "tglobal.h"
#include "wal.h"
#ifdef __cplusplus
extern "C" {
......@@ -38,6 +38,20 @@ typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *p
typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SMnodeLoad {
int64_t numOfDnode;
int64_t numOfMnode;
int64_t numOfVgroup;
int64_t numOfDatabase;
int64_t numOfSuperTable;
int64_t numOfChildTable;
int64_t numOfNormalTable;
int64_t numOfColumn;
int64_t totalPoints;
int64_t totalStorage;
int64_t compStorage;
} SMnodeLoad;
typedef struct {
const char *name;
MndInitFp initFp;
......@@ -104,7 +118,9 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len) ;
uint64_t mndGenerateUid(char *name, int32_t len);
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
#ifdef __cplusplus
}
......
......@@ -215,6 +215,9 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.replica = pVgroup->replica;
createReq.selfIndex = -1;
createReq.streamMode = pVgroup->streamMode;
createReq.hashBegin = pVgroup->hashBegin;
createReq.hashEnd = pVgroup->hashEnd;
createReq.hashMethod = pDb->hashMethod;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......
......@@ -22,6 +22,7 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndProfile.h"
......@@ -36,7 +37,6 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndInfoSchema.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
......@@ -400,6 +400,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
return 0;
}
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
return 0;
}
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
......@@ -505,10 +510,9 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
}
}
// Note: uid 0 is reserved
uint64_t mndGenerateUid(char *name, int32_t len) {
int32_t hashval = MurmurHash3_32(name, len);
int32_t hashval = MurmurHash3_32(name, len);
do {
int64_t us = taosGetTimestampUs();
......
aux_source_directory(src MNODE_SRC)
add_library(sdb ${MNODE_SRC})
add_library(sdb STATIC ${MNODE_SRC})
target_include_directories(
sdb
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mnode/sdb"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
sdb
PRIVATE os
PRIVATE common
PRIVATE util
sdb os common util
)
\ No newline at end of file
aux_source_directory(src QNODE_SRC)
add_library(qnode ${QNODE_SRC})
add_library(qnode STATIC ${QNODE_SRC})
target_include_directories(
qnode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/qnode"
......
aux_source_directory(src SNODE_SRC)
add_library(snode ${SNODE_SRC})
add_library(snode STATIC ${SNODE_SRC})
target_include_directories(
snode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/snode"
......
......@@ -27,12 +27,12 @@ extern "C" {
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SDataStatis;
typedef struct STable {
......@@ -53,6 +53,8 @@ typedef struct STsdb STsdb;
typedef struct STsdbCfg {
int8_t precision;
int8_t update;
int8_t compression;
uint64_t lruCacheSize;
int32_t daysPerFile;
int32_t minRowsPerFileBlock;
......@@ -60,8 +62,6 @@ typedef struct STsdbCfg {
int32_t keep;
int32_t keep1;
int32_t keep2;
int8_t update;
int8_t compression;
} STsdbCfg;
// query condition to build multi-table data block iterator
......
......@@ -57,13 +57,12 @@ typedef struct {
SMetaCfg metaCfg;
STqCfg tqCfg;
SWalCfg walCfg;
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
} SVnodeCfg;
typedef struct {
int32_t sver;
const char *timezone;
const char *locale;
const char *charset;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp;
SendReqToDnodeFp sendReqToDnodeFp;
......
......@@ -18,8 +18,6 @@
#include "tsdbFile.h"
#define TSDB_FS_VERSION 0
// ================== TSDB global config
extern bool tsdbForceKeepFile;
......
......@@ -44,7 +44,37 @@
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
typedef enum {
TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data
TSDB_FILE_LAST, // .last
TSDB_FILE_SMAD, // .smad(Block-wise SMA)
TSDB_FILE_SMAL, // .smal(Block-wise SMA)
TSDB_FILE_MAX, //
TSDB_FILE_META, // meta
TSDB_FILE_TSMA, // .tsma.${sma_index_name}, Time-range-wise SMA
TSDB_FILE_RSMA, // .rsma.${sma_index_name}, Time-range-wise Rollup SMA
} TSDB_FILE_T;
typedef enum {
TSDB_FS_VER_0 = 0,
TSDB_FS_VER_MAX,
} ETsdbFsVer;
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) {
case TSDB_FILE_HEAD: // .head
case TSDB_FILE_DATA: // .data
case TSDB_FILE_LAST: // .last
case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
default:
return TSDB_LATEST_FVER;
}
}
#if 0
// =============== SMFile
......@@ -169,6 +199,7 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby
// =============== SDFile
typedef struct {
uint32_t magic;
uint32_t fver;
uint32_t len;
uint32_t totalBlocks;
uint32_t totalSubBlocks;
......@@ -188,7 +219,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
......@@ -292,12 +323,18 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
typedef struct {
int fid;
int state;
SDFile files[TSDB_FILE_MAX];
int fid;
int8_t state; // -128~127
uint8_t ver; // 0~255, DFileSet version
uint16_t reserve;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
#define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_STATE(s) ((s)->state)
#define TSDB_FSET_VER(s) ((s)->ver)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
......
......@@ -36,6 +36,7 @@ typedef struct {
TSKEY maxKey;
} SBlockIdx;
#ifdef TD_REFACTOR_3
typedef struct {
int64_t last : 1;
int64_t offset : 63;
......@@ -49,22 +50,35 @@ typedef struct {
TSKEY keyLast;
} SBlock;
#else
typedef enum {
TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_MAX,
} ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_0
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
uint8_t reserve0;
uint8_t last : 1;
uint8_t blkVer : 7;
uint8_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
uint32_t len : 32; // data block length
uint32_t len; // data block length
uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t reserve1 : 8;
uint64_t blkVer : 8;
uint64_t aggrOffset : 56;
uint32_t reserve : 8;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int64_t offset;
uint64_t aggrStat : 1;
uint64_t aggrOffset : 63;
TSKEY keyFirst;
TSKEY keyLast;
} SBlock_3;
} SBlockV0;
#define SBlock SBlockV0 // latest SBlock definition
#endif
typedef struct {
int32_t delimiter; // For recovery usage
......@@ -73,6 +87,7 @@ typedef struct {
SBlock blocks[];
} SBlockInfo;
#ifdef TD_REFACTOR_3
typedef struct {
int16_t colId;
uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
......@@ -89,17 +104,50 @@ typedef struct {
uint8_t offsetH;
char padding[1];
} SBlockCol;
#else
typedef struct {
int16_t colId;
uint8_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint8_t reserve : 7;
uint8_t type;
int32_t len;
uint32_t offset;
} SBlockColV0;
#define SBlockCol SBlockColV0 // latest SBlockCol definition
typedef struct {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SAggrBlkColV0;
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
#endif
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
#ifdef TD_REFACTOR_3
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
pBlockCol->offsetH = (uint8_t)(offset >> 24);
#else
pBlockCol->offset = offset;
#endif
}
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
#ifdef TD_REFACTOR_3
uint32_t offset1 = pBlockCol->offset;
uint32_t offset2 = pBlockCol->offsetH;
return (offset1 | (offset2 << 24));
#else
return pBlockCol->offset;
#endif
}
typedef struct {
......@@ -109,31 +157,57 @@ typedef struct {
SBlockCol cols[];
} SBlockData;
typedef void SAggrBlkData; // SBlockCol cols[];
struct SReadH {
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData; // Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo * pBlkInfo;
SBlockData * pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
default:
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
}
}
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
default:
return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
}
}
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
......@@ -147,7 +221,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void * pBuf = *ppBuf;
......
......@@ -50,8 +50,11 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
......@@ -509,7 +512,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
......@@ -560,7 +563,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
......@@ -572,6 +575,74 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
}
// TSDB_FILE_SMAD
SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh));
SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith);
if (access(TSDB_FILE_FULL_NAME(pRSmadF), F_OK) != 0) {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF));
tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);
if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFileEx(pWSmadF, pRSmadF);
if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
// TSDB_FILE_SMAL
SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);
if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) {
tsdbInitDFileEx(pWSmalF, pRSmalF);
if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);
if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
return 0;
......@@ -1131,41 +1202,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData * pBlockData = NULL;
SAggrBlkData *pAggrBlkData = NULL;
int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
// Make buffer space
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pBlockData = (SBlockData *)(*ppBuf);
if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull;
SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pBlockCol, 0, sizeof(*pBlockCol));
memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol));
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
pAggrBlkCol->colId = pDataCol->colId;
if (tDataTypes[pDataCol->type].statisFunc) {
#if 0
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
if (pBlockCol->numOfNull == 0) {
#endif
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max),
&(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
&(pAggrBlkCol->numOfNull));
if (pAggrBlkCol->numOfNull == 0) {
TD_SET_COL_ROWS_NORM(pBlockCol);
} else {
TD_SET_COL_ROWS_MISC(pBlockCol);
......@@ -1181,13 +1268,14 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Compress the data if neccessary
int tcol = 0; // counter of not all NULL and written columns
uint32_t toffset = 0;
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
int32_t lsize = tsize;
uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest);
int32_t keyLen = 0;
int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite);
int32_t tBitmaps = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) {
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
......@@ -1248,7 +1336,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
if (ncol != 0) {
tsdbSetBlockColOffset(pBlockCol, toffset);
pBlockCol->len = flen;
tcol++;
++tcol;
} else {
keyLen = flen;
}
......@@ -1269,6 +1357,18 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
return -1;
}
uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0;
if (aggrStatus > 0) {
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
// Write the whole block to file
if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) {
return -1;
}
}
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
......@@ -1280,6 +1380,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
pBlock->aggrStat = aggrStatus;
pBlock->blkVer = SBlockVerLatest;
pBlock->aggrOffset = (uint64_t)offsetAggr;
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
......@@ -1291,9 +1394,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
......
......@@ -38,6 +38,8 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
......
......@@ -422,7 +422,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
return -1;
}
fsheader.version = TSDB_FS_VERSION;
fsheader.version = TSDB_LATEST_SFS_VER;
if (taosArrayGetSize(pStatus->df) == 0) {
fsheader.len = 0;
} else {
......@@ -697,7 +697,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
ptr = tsdbDecodeFSHeader(ptr, &fsheader);
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
if (fsheader.version != TSDB_FS_VERSION) {
if (fsheader.version != TSDB_LATEST_SFS_VER) {
// TODO: handle file version change
}
......
......@@ -19,8 +19,12 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST
"smad", // TSDB_FILE_SMAD
"smal", // TSDB_FILE_SMAL
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
"tsma", // TSDB_FILE_TSMA
"rsma", // TSDB_FILE_RSMA
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
......@@ -304,6 +308,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
......@@ -341,7 +346,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
}
static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
char *aname;
char *aname = NULL;
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = taosDecodeString(buf, &aname);
......@@ -352,7 +357,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return buf;
}
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
......@@ -382,6 +387,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
}
pDFile->info.size += TSDB_FILE_HEAD_SIZE;
pDFile->info.fver = tsdbGetDFSVersion(fType);
if (tsdbUpdateDFileHeader(pDFile) < 0) {
tsdbCloseDFile(pDFile);
......@@ -493,6 +499,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
......@@ -505,6 +512,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
......@@ -562,8 +570,10 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
// ============== Operations on SDFileSet
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
pSet->fid = fid;
pSet->state = 0;
TSDB_FSET_FID(pSet) = fid;
TSDB_FSET_VER(pSet) = TSDB_LATEST_FSET_VER;
TSDB_FSET_STATE(pSet) = 0;
pSet->reserve = 0;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
......@@ -572,7 +582,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3
}
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
pSet->fid = pOSet->fid;
TSDB_FSET_FID(pSet) = TSDB_FSET_FID(pOSet);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
}
......@@ -581,7 +591,10 @@ void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
// state not included
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
tlen += taosEncodeFixedU16(buf, pSet->reserve);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -590,11 +603,11 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
}
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
TSDB_FSET_STATE(pSet) = 0;
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0;
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -604,7 +617,9 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
tlen += taosEncodeFixedU16(buf, pSet->reserve);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -613,10 +628,10 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
}
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
buf = taosDecodeFixedI32(buf, &(fid));
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -637,7 +652,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet);
return -1;
......
......@@ -821,9 +821,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
tsdbLoadBlockStatis(pReadh, pBlock);
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
loadStatisData = true;
if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
}
}
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
......
......@@ -3276,8 +3276,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
}
int64_t stime = taosGetTimestampUs();
if (tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock) < 0) {
int statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
if (statisStatus < TSDB_STATIS_OK) {
return terrno;
} else if (statisStatus > TSDB_STATIS_OK) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
int16_t* colIds = pHandle->defaultLoadColumn->pData;
......@@ -3288,7 +3292,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
pHandle->statis[i].colId = colIds[i];
}
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
......
......@@ -19,6 +19,7 @@
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int numOfBitmaps, int lenOfBitmaps, int maxPoints, char *buffer,
......@@ -63,10 +64,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
void tsdbDestroyReadH(SReadH *pReadh) {
if (pReadh == NULL) return;
pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
pReadh->pBuf = taosTZfree(pReadh->pBuf);
pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData);
pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
pReadh->cidx = 0;
......@@ -305,15 +308,58 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (!pBlock->aggrStat) {
return TSDB_STATIS_NONE;
}
SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset,
tstrerror(terrno));
return -1;
}
size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
return 0;
}
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
......@@ -337,7 +383,6 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
return 0;
}
......@@ -375,13 +420,14 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
return buf;
}
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) {
#ifdef TD_REFACTOR_3
SBlockData *pBlockData = pReadh->pBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlockData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
++i;
continue;
}
......@@ -392,15 +438,45 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
i++;
j++;
++i;
++j;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
++i;
} else {
j++;
++j;
}
}
#else
if (pBlock->aggrStat) {
SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlock->numOfCols) {
pStatis[i].numOfNull = -1;
++i;
continue;
}
SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j;
if (pStatis[i].colId == pAggrBlkCol->colId) {
pStatis[i].sum = pAggrBlkCol->sum;
pStatis[i].max = pAggrBlkCol->max;
pStatis[i].min = pAggrBlkCol->min;
pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
pStatis[i].minIndex = pAggrBlkCol->minIndex;
pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
++i;
++j;
} else if (pStatis[i].colId < pAggrBlkCol->colId) {
pStatis[i].numOfNull = -1;
++i;
} else {
++j;
}
}
}
#endif
}
static void tsdbResetReadTable(SReadH *pReadh) {
......@@ -449,7 +525,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return -1;
}
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
......@@ -592,7 +668,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols);
// If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1;
if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
pDataCols->numOfRows = pBlock->numOfRows;
......@@ -686,7 +762,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + tsdbGetBlockColOffset(pBlockCol);
int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) +
tsdbGetBlockColOffset(pBlockCol);
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
......
......@@ -30,6 +30,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
cfg.pDnode = pVnodeCfg->pDnode;
cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId;
cfg.hashBegin = pVnodeCfg->hashBegin;
cfg.hashEnd = pVnodeCfg->hashEnd;
cfg.hashMethod = pVnodeCfg->hashMethod;
}
// Validate options
......
......@@ -14,5 +14,6 @@ add_subdirectory(function)
add_subdirectory(qcom)
add_subdirectory(qworker)
add_subdirectory(tfs)
add_subdirectory(monitor)
add_subdirectory(nodes)
add_subdirectory(scalar)
aux_source_directory(src CACHE_SRC)
add_library(cache ${CACHE_SRC})
add_library(cache STATIC ${CACHE_SRC})
target_include_directories(
cache
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/cache"
......
aux_source_directory(src CATALOG_SRC)
add_library(catalog ${CATALOG_SRC})
add_library(catalog STATIC ${CATALOG_SRC})
target_include_directories(
catalog
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog"
......
......@@ -60,6 +60,7 @@ typedef struct SCtgDebug {
bool lockDebug;
bool cacheDebug;
bool apiDebug;
bool metaDebug;
uint32_t showCachePeriodSec;
} SCtgDebug;
......@@ -119,6 +120,10 @@ typedef struct SCatalogStat {
SCtgCacheStat cache;
} SCatalogStat;
typedef struct SCtgUpdateMsgHeader {
SCatalog* pCtg;
} SCtgUpdateMsgHeader;
typedef struct SCtgUpdateVgMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
......@@ -145,6 +150,14 @@ typedef struct SCtgRemoveStbMsg {
uint64_t suid;
} SCtgRemoveStbMsg;
typedef struct SCtgRemoveTblMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
} SCtgRemoveTblMsg;
typedef struct SCtgMetaAction {
int32_t act;
void *data;
......@@ -189,19 +202,21 @@ typedef struct SCtgAction {
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
#define CTG_FLAG_STB 0x1
#define CTG_FLAG_NOT_STB 0x2
#define CTG_FLAG_UNKNOWN_STB 0x4
#define CTG_FLAG_INF_DB 0x8
#define CTG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB)
#define CTG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB)
#define CTG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_GEN_STB_FLAG(_isStb) ((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)
#define CTG_TBTYPE_MATCH(_flag, tbType) (CTG_IS_UNKNOWN_STB(_flag) || (CTG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_FLAG_STB 0x1
#define CTG_FLAG_NOT_STB 0x2
#define CTG_FLAG_UNKNOWN_STB 0x4
#define CTG_FLAG_INF_DB 0x8
#define CTG_FLAG_FORCE_UPDATE 0x10
#define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_FLAG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB)
#define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE)
#define CTG_FLAG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB)
#define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB)))
......
此差异已折叠。
aux_source_directory(src FUNCTION_SRC)
add_library(function ${FUNCTION_SRC})
add_library(function STATIC ${FUNCTION_SRC})
target_include_directories(
function
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/function"
......
aux_source_directory(src INDEX_SRC)
add_library(index ${INDEX_SRC})
add_library(index STATIC ${INDEX_SRC})
target_include_directories(
index
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index"
......
aux_source_directory(src MONITOR_SRC)
add_library(monitor STATIC ${MONITOR_SRC})
target_include_directories(
monitor
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/monitor"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(monitor os util common)
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MONITOR_INT_H_
#define _TD_MONITOR_INT_H_
#include "monitor.h"
#include "tarray.h"
#include "tlockfree.h"
#include "tjson.h"
typedef struct {
SRWLatch lock;
SArray *logs; // array of SMonLogItem
int32_t maxLogs;
const char *server;
uint16_t port;
} SMonitor;
typedef struct SMonInfo {
SArray *logs; // array of SMonLogItem
SJson *pJson;
} SMonInfo;
#ifdef __cplusplus
}
#endif
#endif /*_TD_MONITOR_INT_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "monInt.h"
#include "taoserror.h"
#include "thttp.h"
#include "tlog.h"
#include "ttime.h"
static SMonitor tsMonitor = {0};
int32_t monInit(const SMonCfg *pCfg) {
tsMonitor.logs = taosArrayInit(16, sizeof(SMonInfo));
if (tsMonitor.logs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tsMonitor.maxLogs = pCfg->maxLogs;
tsMonitor.server = pCfg->server;
tsMonitor.port = pCfg->port;
taosInitRWLatch(&tsMonitor.lock);
return 0;
}
void monCleanup() {
taosArrayDestroy(tsMonitor.logs);
tsMonitor.logs = NULL;
}
void monAddLogItem(SMonLogItem *pItem) {
taosWLockLatch(&tsMonitor.lock);
int32_t size = taosArrayGetSize(tsMonitor.logs);
if (size > tsMonitor.maxLogs) {
uInfo("too many logs for monitor");
} else {
taosArrayPush(tsMonitor.logs, pItem);
}
taosWUnLockLatch(&tsMonitor.lock);
}
SMonInfo *monCreateMonitorInfo() {
SMonInfo *pMonitor = calloc(1, sizeof(SMonInfo));
if (pMonitor == NULL) return NULL;
taosWLockLatch(&tsMonitor.lock);
pMonitor->logs = taosArrayDup(tsMonitor.logs);
taosArrayClear(tsMonitor.logs);
taosWUnLockLatch(&tsMonitor.lock);
pMonitor->pJson = tjsonCreateObject();
if (pMonitor->pJson == NULL || pMonitor->logs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
monCleanupMonitorInfo(pMonitor);
return NULL;
}
return pMonitor;
}
void monCleanupMonitorInfo(SMonInfo *pMonitor) {
taosArrayDestroy(pMonitor->logs);
tjsonDelete(pMonitor->pJson);
free(pMonitor);
}
void monSendReport(SMonInfo *pMonitor) {
char *pCont = tjsonToString(pMonitor->pJson);
if (pCont != NULL) {
taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont));
free(pCont);
}
}
void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) {
SJson *pJson = pMonitor->pJson;
tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id);
tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep);
int64_t ms = taosGetTimestampMs();
char buf[40] = {0};
taosFormatUtcTime(buf, sizeof(buf), ms, TSDB_TIME_PRECISION_MILLI);
tjsonAddStringToObject(pJson, "ts", buf);
}
void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) {
}
void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo) {
}
void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo) {
}
void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) {
}
void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo) {
}
enable_testing()
aux_source_directory(. MONITOR_TEST_SRC)
add_executable(monitor_test ${MONITOR_TEST_SRC})
target_link_libraries(
monitor_test
PUBLIC monitor
PUBLIC gtest_main
)
add_test(
NAME monitor_test
COMMAND monitor_test
)
/**
* @file monTest.cpp
* @author slguan (slguan@taosdata.com)
* @brief monitor module tests
* @version 1.0
* @date 2022-03-05
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "os.h"
#include "monitor.h"
class MonitorTest : public ::testing::Test {
protected:
static void SetUpTestSuite() { root = "/tmp/monTest"; }
static void TearDownTestSuite() {}
public:
void SetUp() override {}
void TearDown() override {}
static const char *root;
};
const char *MonitorTest::root;
TEST_F(MonitorTest, 01_Open_Close) {
}
aux_source_directory(src PARSER_SRC)
add_library(parser ${PARSER_SRC})
add_library(parser STATIC ${PARSER_SRC})
target_include_directories(
parser
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser"
......
aux_source_directory(src PLANNER_SRC)
add_library(planner ${PLANNER_SRC})
add_library(planner STATIC ${PLANNER_SRC})
target_include_directories(
planner
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/planner"
......
aux_source_directory(src QUERY_SRC)
add_library(qcom ${QUERY_SRC})
add_library(qcom STATIC ${QUERY_SRC})
target_include_directories(
qcom
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qcom"
......
aux_source_directory(src SCHEDULER_SRC)
add_library(scheduler ${SCHEDULER_SRC})
add_library(scheduler STATIC ${SCHEDULER_SRC})
target_include_directories(
scheduler
......
......@@ -36,6 +36,11 @@ enum {
SCH_WRITE,
};
typedef struct SSchTrans {
void *transInst;
void *transHandle;
} SSchTrans;
typedef struct SSchApiStat {
} SSchApiStat;
......@@ -59,12 +64,13 @@ typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob*
int32_t jobRef;
SSchedulerStat stat;
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
} SSchCallbackParam;
......@@ -75,7 +81,8 @@ typedef struct SSchLevel {
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
SArray *subTasks; // Element is SQueryTask
int32_t taskLaunchIdx; // launch startup index
SArray *subTasks; // Element is SQueryTask
} SSchLevel;
typedef struct SSchTask {
......@@ -105,6 +112,7 @@ typedef struct SSchJobAttr {
} SSchJobAttr;
typedef struct SSchJob {
int64_t refId;
uint64_t queryId;
SSchJobAttr attr;
int32_t levelNum;
......@@ -119,7 +127,6 @@ typedef struct SSchJob {
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
int32_t ref;
int8_t status;
SQueryNodeAddr resNode;
tsem_t rspSem;
......@@ -168,6 +175,8 @@ typedef struct SSchJob {
static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
#ifdef __cplusplus
}
......
......@@ -17,12 +17,17 @@
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
typedef struct SSchTrans {
void *transInst;
void *transHandle;
}SSchTrans;
static SSchedulerMgmt schMgmt = {0};
SSchedulerMgmt schMgmt = {0};
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
}
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return taosReleaseRef(schMgmt.jobRef, refId);
}
uint64_t schGenTaskId(void) {
return atomic_add_fetch_64(&schMgmt.taskId, 1);
......@@ -886,7 +891,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
case TDMT_VND_DROP_TASK_RSP: {
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref));
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
......@@ -908,28 +913,23 @@ _return:
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob *pJob = NULL;
SSchTask *pTask = NULL;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId);
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, pParam->refId);
if (NULL == pJob) {
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
pJob = *job;
atomic_add_fetch_32(&pJob->ref, 1);
int32_t s = taosHashGetSize(pJob->execTasks);
if (s <= 0) {
qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId);
SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId);
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
......@@ -942,7 +942,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return:
if (pJob) {
atomic_sub_fetch_32(&pJob->ref, 1);
taosReleaseRef(schMgmt.jobRef, pParam->refId);
}
tfree(param);
......@@ -1003,28 +1003,29 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
}
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->queryId = qId;
param->taskId = tId;
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = pTask->taskId;
pMsgSendInfo->param = param;
......@@ -1040,7 +1041,7 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
SCH_ERR_JRET(code);
}
qDebug("QID:0x%"PRIx64 ",TID:0x%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS;
_return:
......@@ -1160,7 +1161,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
atomic_store_32(&pTask->lastMsgType, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle};
SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize));
if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
......@@ -1283,7 +1284,60 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks);
}
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, const char* sql, bool syncSchedule) {
int32_t schCancelJob(SSchJob *pJob) {
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
void schFreeJobImpl(void *job) {
if (NULL == job) {
return;
}
SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
int64_t refId = pJob->refId;
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
schDropJobAllTasks(pJob);
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
schFreeTask(pTask);
}
taosArrayDestroy(pLevel->subTasks);
}
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
tfree(pJob->res);
tfree(pJob);
qDebug("QID:0x%"PRIx64" job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
}
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, int64_t *job, const char* sql, bool syncSchedule) {
qDebug("QID:0x%"PRIx64" job started", pDag->queryId);
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
......@@ -1327,21 +1381,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa
tsem_init(&pJob->rspSem, 0, 0);
code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} else {
SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
if (pJob->refId < 0) {
SCH_JOB_ELOG("taosHashPut job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
*(SSchJob **)job = pJob;
taosAcquireRef(schMgmt.jobRef, pJob->refId);
*job = pJob->refId;
if (syncSchedule) {
SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob));
......@@ -1349,25 +1402,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa
}
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
taosReleaseRef(schMgmt.jobRef, pJob->refId);
return TSDB_CODE_SUCCESS;
_return:
*(SSchJob **)job = NULL;
schedulerFreeJob(pJob);
schFreeJobImpl(pJob);
SCH_RET(code);
}
int32_t schCancelJob(SSchJob *pJob) {
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
if (schMgmt.jobRef) {
qError("scheduler already initialized");
return TSDB_CODE_QRY_INVALID_INPUT;
}
......@@ -1381,9 +1429,9 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.jobs) {
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
if (schMgmt.jobRef < 0) {
qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1398,24 +1446,28 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes) {
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
pRes->code = atomic_load_32(&(*pJob)->errCode);
pRes->numOfRows = (*pJob)->resNumOfRows;
SSchJob *job = taosAcquireRef(schMgmt.jobRef, *pJob);
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
taosReleaseRef(schMgmt.jobRef, *pJob);
return TSDB_CODE_SUCCESS;
}
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob) {
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, int64_t *pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
return TSDB_CODE_SUCCESS;
}
......@@ -1541,28 +1593,35 @@ _return:
}
int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
int32_t schedulerFetchRows(int64_t job, void** pData) {
if (NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
atomic_add_fetch_32(&pJob->ref, 1);
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
......@@ -1588,7 +1647,6 @@ int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
}
_return:
while (true) {
*pData = atomic_load_ptr(&pJob->res);
......@@ -1609,96 +1667,47 @@ _return:
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
}
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
atomic_sub_fetch_32(&pJob->ref, 1);
_return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
taosReleaseRef(schMgmt.jobRef, job);
SCH_RET(code);
}
int32_t scheduleCancelJob(void *job) {
SSchJob *pJob = (SSchJob *)job;
atomic_add_fetch_32(&pJob->ref, 1);
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int32_t code = schCancelJob(pJob);
atomic_sub_fetch_32(&pJob->ref, 1);
taosReleaseRef(schMgmt.jobRef, job);
SCH_RET(code);
}
void schedulerFreeJob(void *job) {
if (NULL == job) {
void schedulerFreeJob(int64_t job) {
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
return;
}
SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
bool setJobFree = false;
if (SCH_GET_JOB_STATUS(pJob) > 0) {
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
return;
}
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) {
int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) {
break;
} else if (ref > 0) {
if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) {
schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
setJobFree = true;
}
usleep(1);
} else {
SCH_JOB_ELOG("invalid job ref number, ref:%d", ref);
break;
}
}
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
schDropJobAllTasks(pJob);
if (atomic_load_8(&pJob->userFetch) > 0) {
schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
}
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
schFreeTask(pTask);
}
SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
taosArrayDestroy(pLevel->subTasks);
if (taosRemoveRef(schMgmt.jobRef, job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
}
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
tfree(pJob->res);
tfree(pJob);
qDebug("QID:0x%"PRIx64" job freed", queryId);
}
void schedulerFreeTaskList(SArray *taskList) {
......@@ -1716,9 +1725,17 @@ void schedulerFreeTaskList(SArray *taskList) {
}
void schedulerDestroy(void) {
if (schMgmt.jobs) {
taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.jobs = NULL;
if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
while (pJob) {
taosRemoveRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, pJob->refId);
}
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = 0;
}
}
......@@ -38,15 +38,15 @@
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
#include "tref.h"
namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);
struct SSchJob *pInsertJob = NULL;
struct SSchJob *pQueryJob = NULL;
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
......@@ -65,6 +65,7 @@ void schtInitLogFile() {
tsAsyncLog = 0;
qDebugFlag = 159;
strcpy(tsLogDir, "/var/log/taos");
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
......@@ -255,34 +256,40 @@ void schtSetAsyncSendMsgToServer() {
void *schtSendRsp(void *param) {
SSchJob *job = NULL;
SSchJob *pJob = NULL;
int64_t job = 0;
int32_t code = 0;
while (true) {
job = *(SSchJob **)param;
job = *(int64_t *)param;
if (job) {
break;
}
usleep(1000);
}
pJob = schAcquireJob(job);
void *pIter = taosHashIterate(job->execTasks, NULL);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SSubmitRsp rsp = {0};
rsp.affectedRows = 10;
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
schReleaseJob(job);
return NULL;
}
void *schtCreateFetchRspThread(void *param) {
struct SSchJob* job = (struct SSchJob*)param;
int64_t job = *(int64_t *)param;
SSchJob* pJob = schAcquireJob(job);
sleep(1);
......@@ -291,8 +298,10 @@ void *schtCreateFetchRspThread(void *param) {
rsp->completed = 1;
rsp->numOfRows = 10;
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
schReleaseJob(job);
assert(code == 0);
}
......@@ -329,9 +338,9 @@ void *schtFetchRspThread(void *aa) {
void schtFreeQueryJob(int32_t freeThread) {
static uint32_t freeNum = 0;
SSchJob *job = atomic_load_ptr(&pQueryJob);
int64_t job = queryJobRefId;
if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) {
if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
schedulerFreeJob(job);
if (freeThread) {
if (++freeNum % schtTestPrintNum == 0) {
......@@ -360,7 +369,7 @@ void* schtRunJobThread(void *aa) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
SSchJob *job = NULL;
SSchJob *pJob = NULL;
SSchCallbackParam *param = NULL;
SHashObj *execTasks = NULL;
SDataBuf dataBuf = {0};
......@@ -376,24 +385,29 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
if (NULL == pJob) {
taosArrayDestroy(qnodeList);
schtFreeQueryDag(&dag);
continue;
}
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
void *pIter = taosHashIterate(job->execTasks, NULL);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
schtFetchTaskId = task->taskId - 1;
taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
pQueryJob = job;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
......@@ -412,8 +426,9 @@ void* schtRunJobThread(void *aa) {
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
SSchTask *task = (SSchTask *)pIter;
......@@ -431,7 +446,8 @@ void* schtRunJobThread(void *aa) {
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
......@@ -450,7 +466,8 @@ void* schtRunJobThread(void *aa) {
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
......@@ -470,7 +487,7 @@ void* schtRunJobThread(void *aa) {
atomic_store_32(&schtStartFetch, 1);
void *data = NULL;
code = schedulerFetchRows(pQueryJob, &data);
code = schedulerFetchRows(queryJobRefId, &data);
assert(code == 0 || code);
if (0 == code) {
......@@ -480,12 +497,13 @@ void* schtRunJobThread(void *aa) {
}
data = NULL;
code = schedulerFetchRows(pQueryJob, &data);
code = schedulerFetchRows(queryJobRefId, &data);
assert(code == 0 || code);
schtFreeQueryJob(0);
taosHashCleanup(execTasks);
taosArrayDestroy(qnodeList);
schtFreeQueryDag(&dag);
......@@ -516,7 +534,7 @@ TEST(queryTest, normalCase) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SSchJob *pJob = NULL;
int64_t job = 0;
SQueryDag dag = {0};
schtInitLogFile();
......@@ -537,59 +555,61 @@ TEST(queryTest, normalCase) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &pJob);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
ASSERT_EQ(code, 0);
SSchJob *job = (SSchJob *)pJob;
void *pIter = taosHashIterate(job->execTasks, NULL);
SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(job->execTasks, NULL);
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(job->execTasks, NULL);
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(job->execTasks, NULL);
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
......@@ -603,9 +623,11 @@ TEST(queryTest, normalCase) {
data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(pJob);
schedulerFreeJob(job);
schtFreeQueryDag(&dag);
......@@ -644,14 +666,14 @@ TEST(insertTest, normalCase) {
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
pthread_create(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, "insert into tb values(now,1)", &res);
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
schedulerFreeJob(pInsertJob);
schedulerFreeJob(insertJobRefId);
schedulerDestroy();
}
......@@ -684,4 +706,4 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
aux_source_directory(src SYNC_SRC)
add_library(sync ${SYNC_SRC})
add_library(sync STATIC ${SYNC_SRC})
target_link_libraries(
sync
......
aux_source_directory(src TRANSPORT_SRC)
add_library(transport ${TRANSPORT_SRC})
add_library(transport STATIC ${TRANSPORT_SRC})
target_include_directories(
transport
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport"
......
aux_source_directory(src WAL_SRC)
add_library(wal ${WAL_SRC})
add_library(wal STATIC ${WAL_SRC})
target_include_directories(
wal
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal"
......
aux_source_directory(src OS_SRC)
add_library(os ${OS_SRC})
add_library(os STATIC ${OS_SRC})
target_include_directories(
os
PUBLIC "${CMAKE_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include"
)
target_link_libraries(
os
PUBLIC pthread
PUBLIC dl
PUBLIC rt
PUBLIC m
os pthread dl rt m
)
\ No newline at end of file
......@@ -167,11 +167,9 @@ void taosGetSystemInfo() {
tsTotalMemoryMB = taosGetTotalMemory();
float tmp1, tmp2;
// taosGetDisk();
taosGetBandSpeed(&tmp1);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2);
}
void taosKillSystem() {
......@@ -712,7 +710,6 @@ void taosGetSystemInfo() {
float tmp1, tmp2;
taosGetSysMemory(&tmp1);
taosGetProcMemory(&tmp2);
// taosGetDisk();
taosGetBandSpeed(&tmp1);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2);
......
......@@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
......@@ -32,7 +32,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
fd = taosOpenTcpClientSocket(ip, port, 0);
if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create http socket since %s", terrstr());
uError("failed to create http socket to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
......@@ -46,24 +46,24 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
if (taosWriteSocket(fd, (void*)header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http header since %s", terrstr());
uError("failed to send http header to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
if (taosWriteSocket(fd, (void*)pCont, contLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http content since %s", terrstr());
uError("failed to send http content to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
// read something to avoid nginx error 499
if (taosReadSocket(fd, header, 10) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to receive response since %s", terrstr());
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
uInfo("send http to %s:%d, len:%d content: %s", server, port, contLen, pCont);
uInfo("send http to %s:%u, len:%d content: %s", server, port, contLen, pCont);
code = 0;
SEND_OVER:
......
......@@ -26,7 +26,11 @@ SJson* tjsonCreateObject() {
return pJson;
}
void tjsonDelete(SJson* pJson) { cJSON_Delete((cJSON*)pJson); }
void tjsonDelete(SJson* pJson) {
if (pJson != NULL) {
cJSON_Delete((cJSON*)pJson);
}
}
int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t number) {
char tmp[40] = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册