提交 b0d73826 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

......@@ -193,6 +193,7 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
add_compile_options(-Wno-sign-compare)
add_subdirectory(libuv)
endif(${BUILD_WITH_UV})
......
......@@ -66,7 +66,7 @@ int32_t init_env() {
}
int32_t create_topic() {
printf("create topic");
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
......@@ -91,6 +91,10 @@ int32_t create_topic() {
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d\n", resp);
}
tmq_t* build_consumer() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -103,6 +107,7 @@ tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
return tmq;
}
......@@ -144,7 +149,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
tmq_resp_err_t err;
......@@ -214,6 +219,6 @@ int main(int argc, char* argv[]) {
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop(tmq, topic_list);
}
......@@ -198,8 +198,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
/* --------------------------TMQ INTERFACE------------------------------- */
enum tmq_resp_err_t {
TMQ_RESP_ERR__FAIL = -1,
TMQ_RESP_ERR__SUCCESS = 0,
TMQ_RESP_ERR__FAIL = 1,
};
typedef enum tmq_resp_err_t tmq_resp_err_t;
......@@ -226,7 +226,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics);
#endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
......@@ -238,6 +238,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
......
......@@ -16,7 +16,6 @@
#ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
......@@ -43,14 +42,16 @@ extern "C" {
// int16_t bytes;
// } SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
#define TMQ_REQ_TYPE_CONSUME_AND_COMMIT 2
enum {
TMQ_CONF__RESET_OFFSET__LATEST = -1,
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
TMQ_CONF__RESET_OFFSET__NONE = -3,
};
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
SArray* pGroupList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SColumnDataAgg {
......@@ -79,14 +80,14 @@ typedef struct SConstantItem {
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
SColumnDataAgg* pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData>
SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
} SSDataBlock;
typedef struct SVarColAttr {
int32_t *offset; // start position for each entry in the list
int32_t* offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data
uint32_t allocLen; // allocated buffer size
} SVarColAttr;
......@@ -94,11 +95,11 @@ typedef struct SVarColAttr {
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed
bool hasNull;// if current column data has null value.
char *pData; // the corresponding block data in memory
SColumnInfo info; // TODO filter info needs to be removed
bool hasNull; // if current column data has null value.
char* pData; // the corresponding block data in memory
union {
char *nullbitmap; // bitmap, one bit for each item in the list
char* nullbitmap; // bitmap, one bit for each item in the list
SVarColAttr varmeta;
};
} SColumnInfoData;
......@@ -149,7 +150,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
int32_t tlen = 0;
int32_t sz = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->committedOffset);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
......@@ -170,7 +170,6 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->committedOffset);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
......@@ -250,11 +249,11 @@ typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
typedef struct SExprInfo {
......@@ -271,7 +270,7 @@ typedef struct SSessionWindow {
SColumn col;
} SSessionWindow;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
......
......@@ -68,8 +68,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes));
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
......
......@@ -20,8 +20,9 @@
extern "C" {
#endif
#include "tdef.h"
#include "tcfg.h"
#include "tdef.h"
#include "tarray.h"
// cluster
extern char tsFirst[];
......@@ -29,44 +30,39 @@ extern char tsSecond[];
extern char tsLocalFqdn[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern int32_t tsVersion;
extern int32_t tsStatusInterval;
extern int8_t tsEnableTelemetryReporting;
extern int32_t tsNumOfSupportVnodes;
extern bool tsEnableTelemetryReporting;
// common
extern int tsRpcTimer;
extern int tsRpcMaxTime;
extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled
extern int32_t tsMaxConnections;
extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer;
extern uint32_t tsMaxTmrCtrl;
extern float tsNumOfThreadsPerCore;
extern int32_t tsNumOfCommitThreads;
extern float tsRatioOfQueryCores;
extern int8_t tsDaylight;
extern int8_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern char tsTempDir[];
extern int tsCompatibleModel; // 2.0 compatible model
extern int8_t tsEnableSlaveQuery;
extern int8_t tsEnableAdjustMaster;
extern int8_t tsPrintAuth;
extern int64_t tsTickPerDay[3];
extern int32_t tsRpcTimer;
extern int32_t tsRpcMaxTime;
extern bool tsRpcForceTcp; // all commands go to tcp protocol if this is enabled
extern int32_t tsMaxConnections;
extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer;
extern int32_t tsMaxTmrCtrl;
extern float tsNumOfThreadsPerCore;
extern int32_t tsNumOfCommitThreads;
extern float tsRatioOfQueryCores;
extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth;
extern int64_t tsTickPerDay[3];
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
extern int32_t tsRetrieveBlockingModel; // retrieve threads will be blocked
extern int8_t tsKeepOriginalColumnName;
extern int8_t tsDeadLockKillQuery;
extern bool tsRetrieveBlockingModel; // retrieve threads will be blocked
extern bool tsKeepOriginalColumnName;
extern bool tsDeadLockKillQuery;
// client
extern int32_t tsMaxWildCardsLen;
extern int32_t tsMaxRegexStringLen;
extern int8_t tsTscEnableRecordSql;
extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
......@@ -77,19 +73,6 @@ extern float tsStreamComputDelayRatio; // the delayed computing ration of the
extern int32_t tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
// system info
extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB;
extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB;
extern float tsUsedDataDirGB;
extern float tsMinimalLogDirGB;
extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB;
extern uint32_t tsVersion;
// build info
extern char version[];
extern char compatible_version[];
......@@ -105,18 +88,19 @@ extern uint32_t tsMaxRange;
extern uint32_t tsCurRange;
extern char tsCompressor[];
// tfs
extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[];
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
void taosInitGlobalCfg();
int32_t taosCheckAndPrintCfg();
int32_t taosCfgDynamicOptions(char *msg);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
void taosAddDataDir(int index, char *v1, int level, int primary);
void taosReadDataDirCfg(char *v1, char *v2, char *v3);
void taosPrintDataDirCfg();
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char *envFile,
const char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosInitCfg(const char *cfgDir, const char *envFile, const char *apolloUrl, SArray *pArgs, bool tsc);
void taosCleanupCfg();
void taosCfgDynamicOptions(const char *option, const char *value);
struct SConfig *taosGetCfg();
#ifdef __cplusplus
}
......
......@@ -422,8 +422,8 @@ typedef struct {
} SColumnInfo;
typedef struct {
uint64_t uid;
TSKEY key; // last accessed ts, for subscription
int64_t uid;
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
typedef struct STimeWindow {
......@@ -554,8 +554,8 @@ int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
uint64_t uid;
char db[TSDB_DB_FNAME_LEN];
int64_t uid;
} SDropDbRsp;
int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
......@@ -570,12 +570,12 @@ int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
uint64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
SArray* pVgroupInfos; // Array of SVgroupInfo
char db[TSDB_DB_FNAME_LEN];
int64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
SArray* pVgroupInfos; // Array of SVgroupInfo
} SUseDbRsp;
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
......@@ -656,9 +656,9 @@ int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp
typedef struct {
int32_t statusInterval;
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset
char timezone[TD_TIMEZONE_LEN]; // tsTimezone
char locale[TD_LOCALE_LEN]; // tsLocale
char charset[TD_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct {
......@@ -725,7 +725,7 @@ typedef struct {
int32_t vgId;
int32_t dnodeId;
char db[TSDB_DB_FNAME_LEN];
uint64_t dbUid;
int64_t dbUid;
int32_t vgVersion;
int32_t cacheBlockSize;
int32_t totalBlocks;
......@@ -753,10 +753,10 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
typedef struct {
int32_t vgId;
int32_t dnodeId;
uint64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
int32_t dnodeId;
int64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
......@@ -796,7 +796,7 @@ typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
......@@ -1268,7 +1268,7 @@ typedef struct {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid;
int64_t tuid;
int32_t sverson;
int32_t execLen;
char* executor;
......@@ -1279,11 +1279,11 @@ typedef struct {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid;
int64_t tuid;
} SDDropTopicReq;
typedef struct SVCreateTbReq {
uint64_t ver; // use a general definition
int64_t ver; // use a general definition
char* name;
uint32_t ttl;
uint32_t keep;
......@@ -1314,8 +1314,8 @@ int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
typedef struct {
uint64_t ver; // use a general definition
SArray* pArray;
int64_t ver; // use a general definition
SArray* pArray;
} SVCreateTbBatchReq;
typedef struct {
......@@ -1325,7 +1325,7 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
typedef struct {
uint64_t ver;
int64_t ver;
char* name;
uint8_t type;
tb_uid_t suid;
......@@ -1760,35 +1760,19 @@ typedef struct {
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqOffset;
typedef struct {
int32_t vgId;
SArray* offsets; // SArray<SMqOffset>
} SMqVgOffsets;
typedef struct {
int32_t num;
SMqOffset* offsets;
} SMqCMResetOffsetReq;
typedef struct {
int32_t reserved;
} SMqCMResetOffsetRsp;
typedef struct {
int64_t leftForVer;
SMqVgOffsets offsets;
} SMqMVResetOffsetReq;
} SMqCMCommitOffsetReq;
typedef struct {
int32_t reserved;
} SMqMVResetOffsetRsp;
} SMqCMCommitOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
typedef struct {
uint32_t nCols;
......@@ -1870,7 +1854,6 @@ typedef struct {
typedef struct {
int64_t consumerId;
SSchemaWrapper* schemas;
int64_t committedOffset;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
......@@ -1881,22 +1864,18 @@ typedef struct {
// one req for one vg+topic
typedef struct {
SMsgHead head;
// 0: commit only, current offset
// 1: consume only, poll next offset
// 2: commit current and consume next offset
int32_t reqType;
int64_t reqId;
int64_t consumerId;
int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t offset;
int64_t currentOffset;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq;
typedef struct {
int32_t vgId;
int64_t offset;
SEpSet epSet;
} SMqSubVgEp;
......@@ -1917,12 +1896,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taos
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf;
}
......
......@@ -135,18 +135,19 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "mnode-trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "mnode-telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
// Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG)
......@@ -194,4 +195,3 @@ enum {
TDMT_MAX
#endif
};
// clang-format on
......@@ -73,9 +73,9 @@ typedef struct STSGroupBlockInfoEx {
} STSGroupBlockInfoEx;
typedef struct STSBuf {
FILE* f;
char path[PATH_MAX];
uint32_t fileSize;
TdFilePtr pFile;
char path[PATH_MAX];
uint32_t fileSize;
// todo use array
STSGroupBlockInfoEx* pData;
......
......@@ -91,6 +91,8 @@ typedef struct {
do { \
switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
*(bool *)(_v) = (bool)(_data); \
break; \
case TSDB_DATA_TYPE_TINYINT: \
*(int8_t *)(_v) = (int8_t)(_data); \
break; \
......@@ -104,6 +106,7 @@ typedef struct {
*(uint16_t *)(_v) = (uint16_t)(_data); \
break; \
case TSDB_DATA_TYPE_BIGINT: \
case TSDB_DATA_TYPE_TIMESTAMP: \
*(int64_t *)(_v) = (int64_t)(_data); \
break; \
case TSDB_DATA_TYPE_UBIGINT: \
......
......@@ -17,6 +17,7 @@
#define _TD_DNODE_H_
#include "tdef.h"
#include "tcfg.h"
#ifdef __cplusplus
extern "C" {
......@@ -25,26 +26,12 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ---------------- */
typedef struct SDnode SDnode;
/* ------------------------ Environment ------------------ */
typedef struct {
int32_t sver;
int32_t numOfCores;
int16_t numOfCommitThreads;
int8_t enableTelem;
char timezone[TSDB_TIMEZONE_LEN];
char locale[TSDB_LOCALE_LEN];
char charset[TSDB_LOCALE_LEN];
char buildinfo[64];
char gitinfo[48];
} SDnodeEnvCfg;
/**
* @brief Initialize the environment
*
* @param pOption Option of the environment
* @return int32_t 0 for success and -1 for failure
*/
int32_t dndInit(const SDnodeEnvCfg *pCfg);
int32_t dndInit();
/**
* @brief clear the environment
......@@ -54,17 +41,15 @@ void dndCleanup();
/* ------------------------ SDnode ----------------------- */
typedef struct {
int32_t numOfSupportVnodes;
int32_t statusInterval;
float numOfThreadsPerCore;
float ratioOfQueryCores;
int32_t maxShellConns;
int32_t shellActivityTimer;
uint16_t serverPort;
char dataDir[TSDB_FILENAME_LEN];
char localEp[TSDB_EP_LEN];
char localFqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN];
int32_t numOfSupportVnodes;
uint16_t serverPort;
char dataDir[TSDB_FILENAME_LEN];
char localEp[TSDB_EP_LEN];
char localFqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN];
char secondEp[TSDB_EP_LEN];
SDiskCfg *pDisks;
int32_t numOfDisks;
} SDnodeObjCfg;
/**
......
......@@ -44,25 +44,12 @@ typedef struct SMnodeLoad {
int64_t compStorage;
} SMnodeLoad;
typedef struct SMnodeCfg {
int32_t sver;
int8_t enableTelem;
int32_t statusInterval;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
char *buildinfo;
char *gitinfo;
} SMnodeCfg;
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMnodeCfg cfg;
SDnode *pDnode;
PutReqToMWriteQFp putReqToMWriteQFp;
PutReqToMReadQFp putReqToMReadQFp;
......
......@@ -113,14 +113,15 @@ typedef enum {
SDB_USER = 7,
SDB_AUTH = 8,
SDB_ACCT = 9,
SDB_SUBSCRIBE = 10,
SDB_CONSUMER = 11,
SDB_TOPIC = 12,
SDB_VGROUP = 13,
SDB_STB = 14,
SDB_DB = 15,
SDB_FUNC = 16,
SDB_MAX = 17
SDB_OFFSET = 10,
SDB_SUBSCRIBE = 11,
SDB_CONSUMER = 12,
SDB_TOPIC = 13,
SDB_VGROUP = 14,
SDB_STB = 15,
SDB_DB = 16,
SDB_FUNC = 17,
SDB_MAX = 18
} ESdbType;
typedef struct SSdb SSdb;
......
......@@ -228,6 +228,7 @@ typedef struct SAggFunctionInfo {
typedef struct SScalarParam {
void* data;
bool colData;
int32_t num;
int32_t type;
int32_t bytes;
......
......@@ -61,22 +61,28 @@ typedef enum ENodeType {
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_COLUMN_REF,
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
QUERY_NODE_TARGET,
// Only be used in parser module.
QUERY_NODE_RAW_EXPR,
QUERY_NODE_TUPLE_DESC,
QUERY_NODE_SLOT_DESC,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_FILTER,
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT
QUERY_NODE_LOGIC_PLAN_PROJECT,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG
} ENodeType;
/**
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "querynodes.h"
#include "tmsg.h"
typedef struct SLogicNode {
ENodeType type;
......@@ -31,10 +32,20 @@ typedef struct SLogicNode {
struct SLogicNode* pParent;
} SLogicNode;
typedef enum EScanType {
SCAN_TYPE_TAG,
SCAN_TYPE_TABLE,
SCAN_TYPE_STABLE,
SCAN_TYPE_STREAM
} EScanType;
typedef struct SScanLogicNode {
SLogicNode node;
SNodeList* pScanCols;
struct STableMeta* pMeta;
EScanType scanType;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -43,10 +54,6 @@ typedef struct SJoinLogicNode {
SNode* pOnConditions;
} SJoinLogicNode;
typedef struct SFilterLogicNode {
SLogicNode node;
} SFilterLogicNode;
typedef struct SAggLogicNode {
SLogicNode node;
SNodeList* pGroupKeys;
......@@ -58,6 +65,68 @@ typedef struct SProjectLogicNode {
SNodeList* pProjections;
} SProjectLogicNode;
typedef struct SSlotDescNode {
ENodeType type;
int16_t slotId;
SDataType dataType;
bool reserve;
bool output;
} SSlotDescNode;
typedef struct STupleDescNode {
ENodeType type;
int16_t tupleId;
SNodeList* pSlots;
} STupleDescNode;
typedef struct SPhysiNode {
ENodeType type;
STupleDescNode outputTuple;
SNode* pConditions;
SNodeList* pChildren;
struct SPhysiNode* pParent;
} SPhysiNode;
typedef struct SScanPhysiNode {
SPhysiNode node;
SNodeList* pScanCols;
uint64_t uid; // unique id of the table
int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
int32_t reverse; // reverse scan count
} SScanPhysiNode;
typedef SScanPhysiNode SSystemTableScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode;
typedef struct STableScanPhysiNode {
SScanPhysiNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef struct SProjectPhysiNode {
SPhysiNode node;
SNodeList* pProjections;
} SProjectPhysiNode;
typedef struct SJoinPhysiNode {
SPhysiNode node;
EJoinType joinType;
SNode* pOnConditions; // in or out tuple ?
SNodeList* pTargets;
} SJoinPhysiNode;
typedef struct SAggPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function
SNodeList* pGroupKeys; // SColumnRefNode list
SNodeList* pAggFuncs;
} SAggPhysiNode;
#ifdef __cplusplus
}
#endif
......
......@@ -58,15 +58,24 @@ typedef struct SColumnNode {
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
int16_t tupleId;
int16_t slotId;
} SColumnNode;
typedef struct SColumnRefNode {
// typedef struct SColumnRefNode {
// ENodeType type;
// SDataType dataType;
// int16_t tupleId;
// int16_t slotId;
// int16_t columnId;
// } SColumnRefNode;
typedef struct STargetNode {
ENodeType type;
SDataType dataType;
int16_t tupleId;
int16_t slotId;
int16_t columnId;
} SColumnRefNode;
SNode* pExpr;
} STargetNode;
typedef struct SValueNode {
SExprNode node; // QUERY_NODE_VALUE
......@@ -81,45 +90,6 @@ typedef struct SValueNode {
} datum;
} SValueNode;
typedef enum EOperatorType {
// arithmetic operator
OP_TYPE_ADD = 1,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_MOD,
// bit operator
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
// comparison operator
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_IS_TRUE,
OP_TYPE_IS_FALSE,
OP_TYPE_IS_UNKNOWN,
OP_TYPE_IS_NOT_TRUE,
OP_TYPE_IS_NOT_FALSE,
OP_TYPE_IS_NOT_UNKNOWN,
// json operator
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS
} EOperatorType;
typedef struct SOperatorNode {
SExprNode node; // QUERY_NODE_OPERATOR
EOperatorType opType;
......@@ -127,11 +97,6 @@ typedef struct SOperatorNode {
SNode* pRight;
} SOperatorNode;
typedef enum ELogicConditionType {
LOGIC_COND_TYPE_AND,
LOGIC_COND_TYPE_OR,
LOGIC_COND_TYPE_NOT,
} ELogicConditionType;
typedef struct SLogicConditionNode {
SExprNode node; // QUERY_NODE_LOGIC_CONDITION
......@@ -141,6 +106,7 @@ typedef struct SLogicConditionNode {
typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST
SDataType dataType;
SNodeList* pNodeList;
} SNodeListNode;
......@@ -306,7 +272,8 @@ bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
void *nodesGetValueFromNode(SValueNode *pNode);
void* nodesGetValueFromNode(SValueNode *pNode);
#ifdef __cplusplus
}
......
......@@ -20,6 +20,14 @@ extern "C" {
#endif
typedef struct SFilterInfo SFilterInfo;
typedef int32_t (*filer_get_col_from_id)(void *, int32_t, void **);
enum {
FLT_OPTION_NO_REWRITE = 1,
FLT_OPTION_TIMESTAMP = 2,
FLT_OPTION_NEED_UNIQE = 4,
};
typedef struct SFilterColumnParam{
int32_t numOfCols;
......@@ -27,8 +35,18 @@ typedef struct SFilterColumnParam{
} SFilterColumnParam;
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_FILTER_H
\ No newline at end of file
#endif // TDENGINE_FILTER_H
......@@ -29,6 +29,7 @@ typedef struct SFilterInfo SFilterInfo;
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes);
int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst);
int32_t scalarGetOperatorParamNum(EOperatorType type);
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut);
......
......@@ -78,12 +78,15 @@ typedef struct SRpcInit {
// call back to retrieve the client auth info, for server app only
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
// call back to keep conn or not
bool (*pfp)(void *parent, tmsg_t msgType);
void *parent;
} SRpcInit;
int32_t rpcInit();
void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc);
void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void * rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
......
......@@ -120,8 +120,8 @@ typedef struct SWal {
int32_t fsyncSeq;
// meta
SWalVer vers;
int64_t writeLogTfd;
int64_t writeIdxTfd;
TdFilePtr pWriteLogTFile;
TdFilePtr pWriteIdxTFile;
int32_t writeCur;
SArray *fileInfoSet;
// status
......@@ -138,8 +138,8 @@ typedef struct SWal {
typedef struct SWalReadHandle {
SWal *pWal;
int64_t readLogTfd;
int64_t readIdxTfd;
TdFilePtr pReadLogTFile;
TdFilePtr pReadIdxTFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
......
......@@ -59,6 +59,7 @@ extern "C" {
#include "osEndian.h"
#include "osEnv.h"
#include "osFile.h"
#include "osLocale.h"
#include "osLz4.h"
#include "osMath.h"
#include "osMemory.h"
......@@ -73,6 +74,7 @@ extern "C" {
#include "osThread.h"
#include "osTime.h"
#include "osTimer.h"
#include "osTimezone.h"
void osInit();
......
......@@ -191,6 +191,10 @@ extern "C" {
#define TD_DIRSEP "/"
#endif
#define TD_LOCALE_LEN 64
#define TD_CHARSET_LEN 64
#define TD_TIMEZONE_LEN 96
#ifdef __cplusplus
}
#endif
......
......@@ -23,8 +23,8 @@ extern "C" {
void taosRemoveDir(const char *dirname);
int32_t taosDirExist(char *dirname);
int32_t taosMkDir(const char *dirname);
void taosRemoveOldFiles(char *dirname, int32_t keepDays);
int32_t taosExpandDir(char *dirname, char *outname, int32_t maxlen);
void taosRemoveOldFiles(const char *dirname, int32_t keepDays);
int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen);
int32_t taosRealPath(char *dirname, int32_t maxlen);
#ifdef __cplusplus
......
......@@ -16,15 +16,37 @@
#ifndef _TD_OS_ENV_H_
#define _TD_OS_ENV_H_
#include "osSysinfo.h"
#ifdef __cplusplus
extern "C" {
#endif
extern char tsOsName[];
extern char tsOsName[];
extern char tsTimezone[];
extern char tsCharset[];
extern char tsLocale[];
extern int8_t tsDaylight;
extern bool tsEnableCoreFile;
extern int64_t tsPageSize;
extern int64_t tsOpenMax;
extern int64_t tsStreamMax;
extern int32_t tsNumOfCores;
extern int32_t tsTotalMemoryMB;
extern char configDir[];
extern char tsDataDir[];
extern char tsLogDir[];
extern char tsScriptDir[];
extern char configDir[];
extern char tsTempDir[];
extern SDiskSpace tsDataSpace;
extern SDiskSpace tsLogSpace;
extern SDiskSpace tsTempSpace;
void osInit();
void osUpdate();
bool osLogSpaceAvailable();
void osSetTimezone(const char *timezone);
#ifdef __cplusplus
}
......
......@@ -22,49 +22,63 @@ extern "C" {
#include "osSocket.h"
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
typedef int32_t FileFd;
#else
typedef int32_t FileFd;
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
#define fopen FOPEN_FUNC_TAOS_FORBID
// #define close CLOSE_FUNC_TAOS_FORBID
// #define fclose FCLOSE_FUNC_TAOS_FORBID
#endif
#define FD_INITIALIZER ((int32_t)-1)
#ifndef PATH_MAX
#define PATH_MAX 256
#endif
int32_t taosLockFile(FileFd fd);
int32_t taosUnLockFile(FileFd fd);
int32_t taosUmaskFile(FileFd fd);
typedef struct TdFile *TdFilePtr;
#define TD_FILE_CTEATE 0x0001
#define TD_FILE_WRITE 0x0002
#define TD_FILE_READ 0x0004
#define TD_FILE_TRUNC 0x0008
#define TD_FILE_APPEND 0x0010
#define TD_FILE_TEXT 0x0020
#define TD_FILE_AUTO_DEL 0x0040
#define TD_FILE_EXCL 0x0080
int32_t taosLockFile(TdFilePtr pFile);
int32_t taosUnLockFile(TdFilePtr pFile);
int32_t taosUmaskFile(int32_t maskVal);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime);
int32_t taosFStatFile(FileFd fd, int64_t *size, int32_t *mtime);
FileFd taosOpenFileWrite(const char *path);
FileFd taosOpenFileCreateWrite(const char *path);
FileFd taosOpenFileCreateWriteTrunc(const char *path);
FileFd taosOpenFileCreateWriteAppend(const char *path);
FileFd taosOpenFileRead(const char *path);
FileFd taosOpenFileReadWrite(const char *path);
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence);
int32_t taosFtruncateFile(FileFd fd, int64_t length);
int32_t taosFsyncFile(FileFd fd);
int64_t taosReadFile(FileFd fd, void *buf, int64_t count);
int64_t taosWriteFile(FileFd fd, const void *buf, int64_t count);
void taosCloseFile(FileFd fd);
int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime);
TdFilePtr taosOpenFile(const char *path,int32_t tdFileOptions);
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence);
int32_t taosFtruncateFile(TdFilePtr pFile, int64_t length);
int32_t taosFsyncFile(TdFilePtr pFile);
int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count);
int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset);
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict__ ptrBuf);
int32_t taosEOFFile(TdFilePtr pFile);
int64_t taosCloseFile(TdFilePtr *ppFile);
int32_t taosRenameFile(const char *oldName, const char *newName);
int64_t taosCopyFile(const char *from, const char *to);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length);
bool taosValidFile(TdFilePtr pFile);
int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t size);
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
int taosGetErrorFile(TdFilePtr pFile);
#ifdef __cplusplus
}
......
......@@ -13,43 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
#ifndef _TD_OS_LOCALE_H_
#define _TD_OS_LOCALE_H_
#include "os.h"
#include "osString.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tep.h"
#include "stub.h"
#include "addr_any.h"
#include "scalar.h"
namespace {
}
TEST(scalarTest, func) {
}
#ifdef __cplusplus
extern "C" {
#endif
char *taosCharsetReplace(char *charsetstr);
void taosGetSystemLocale(char *outLocale, char *outCharset);
void taosSetSystemLocale(const char *inLocale, const char *inCharSet);
int main(int argc, char** argv) {
srand(time(NULL));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
#ifdef __cplusplus
}
#endif
#pragma GCC diagnostic pop
#endif /*_TD_OS_LOCALE_H_*/
......@@ -55,7 +55,7 @@ void taosSetMaskSIGPIPE();
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen);
uint32_t taosInetAddr(char *ipAddr);
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
......
......@@ -45,7 +45,6 @@ int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
bool taosValidateEncodec(const char *encodec);
char * taosCharsetReplace(char *charsetstr);
#ifdef __cplusplus
}
......
......@@ -21,18 +21,6 @@ extern "C" {
#endif
#include "os.h"
#define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 96
extern int64_t tsPageSize;
extern int64_t tsOpenMax;
extern int64_t tsStreamMax;
extern int32_t tsNumOfCores;
extern int32_t tsTotalMemoryMB;
extern char tsTimezone[];
extern char tsLocale[];
extern char tsCharset[]; // default encode string
typedef struct {
int64_t total;
......@@ -40,6 +28,11 @@ typedef struct {
int64_t avail;
} SDiskSize;
typedef struct {
int64_t reserved;
SDiskSize size;
} SDiskSpace;
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
int32_t taosGetCpuCores();
void taosGetSystemInfo();
......@@ -51,7 +44,6 @@ void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage);
bool taosGetProcMemory(float *memoryUsedMB);
bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo();
int taosSystem(const char *cmd);
void taosKillSystem();
int32_t taosGetSystemUUID(char *uid, int32_t uidlen);
......
......@@ -13,17 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_TIMEZONE_H_
#define _TD_COMMON_TIMEZONE_H_
#ifndef _TD_OS_TIMEZONE_H_
#define _TD_OS_TIMEZONE_H_
#ifdef __cplusplus
extern "C" {
#endif
void tsSetTimeZone();
void taosGetSystemTimezone(char *outTimezone);
void taosSetSystemTimezone(const char *inTimezone, char *outTimezone, int8_t *outDaylight);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_TIMEZONE_H_*/
#endif /*_TD_OS_TIMEZONE_H_*/
......@@ -16,17 +16,18 @@
#ifndef _TD_UTIL_COMPARE_H_
#define _TD_UTIL_COMPARE_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#define TSDB_PATTERN_MATCH 0
#define TSDB_PATTERN_NOMATCH 1
#define TSDB_PATTERN_NOWILDCARDMATCH 2
#define TSDB_PATTERN_MATCH 0
#define TSDB_PATTERN_NOMATCH 1
#define TSDB_PATTERN_NOWILDCARDMATCH 2
#define TSDB_PATTERN_STRING_DEFAULT_LEN 100
#define TSDB_REGEX_STRING_DEFAULT_LEN 128
#define TSDB_REGEX_STRING_DEFAULT_LEN 128
#define FLT_COMPAR_TOL_FACTOR 4
#define FLT_EQUAL(_x, _y) (fabs((_x) - (_y)) <= (FLT_COMPAR_TOL_FACTOR * FLT_EPSILON))
......@@ -62,7 +63,6 @@ int32_t setChkNotInBytes8(const void *pLeft, const void *pRight);
int32_t compareChkInString(const void *pLeft, const void *pRight);
int32_t compareChkNotInString(const void *pLeft, const void *pRight);
int32_t compareInt8Val(const void *pLeft, const void *pRight);
int32_t compareInt16Val(const void *pLeft, const void *pRight);
int32_t compareInt32Val(const void *pLeft, const void *pRight);
......@@ -83,7 +83,6 @@ int32_t compareStrRegexComp(const void *pLeft, const void *pRight);
int32_t compareStrRegexCompMatch(const void *pLeft, const void *pRight);
int32_t compareStrRegexCompNMatch(const void *pLeft, const void *pRight);
int32_t compareInt8ValDesc(const void *pLeft, const void *pRight);
int32_t compareInt16ValDesc(const void *pLeft, const void *pRight);
int32_t compareInt32ValDesc(const void *pLeft, const void *pRight);
......@@ -102,7 +101,6 @@ int32_t compareLenPrefixedWStrDesc(const void *pLeft, const void *pRight);
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
#ifdef __cplusplus
}
#endif
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
......@@ -13,85 +14,98 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_CONFIG_H
#define _TD_UTIL_CONFIG_H
#ifndef _TD_CONFIG_H_
#define _TD_CONFIG_H_
#include "os.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 115
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
#define TSDB_CFG_CTYPE_B_CONFIG 1U // can be configured from file
#define TSDB_CFG_CTYPE_B_SHOW 2U // can displayed by "show configs" commands
#define TSDB_CFG_CTYPE_B_LOG 4U // is a log type configuration
#define TSDB_CFG_CTYPE_B_CLIENT 8U // can be displayed in the client log
#define TSDB_CFG_CTYPE_B_OPTION 16U // can be configured by taos_options function
#define TSDB_CFG_CTYPE_B_NOT_PRINT 32U // such as password
#define MAX_FLOAT 100000
#define MIN_FLOAT 0
enum {
TAOS_CFG_CSTATUS_NONE, // not configured
TAOS_CFG_CSTATUS_DEFAULT, // use system default value
TAOS_CFG_CSTATUS_FILE, // configured from file
TAOS_CFG_CSTATUS_OPTION, // configured by taos_options function
TAOS_CFG_CSTATUS_ARG, // configured by program argument
};
enum {
TAOS_CFG_VTYPE_INT8,
TAOS_CFG_VTYPE_INT16,
TAOS_CFG_VTYPE_INT32,
TAOS_CFG_VTYPE_UINT16,
TAOS_CFG_VTYPE_FLOAT,
TAOS_CFG_VTYPE_STRING,
TAOS_CFG_VTYPE_IPSTR,
TAOS_CFG_VTYPE_DIRECTORY,
TAOS_CFG_VTYPE_DATA_DIRCTORY,
TAOS_CFG_VTYPE_DOUBLE,
};
enum {
TAOS_CFG_UTYPE_NONE,
TAOS_CFG_UTYPE_PERCENT,
TAOS_CFG_UTYPE_GB,
TAOS_CFG_UTYPE_MB,
TAOS_CFG_UTYPE_BYTE,
TAOS_CFG_UTYPE_SECOND,
TAOS_CFG_UTYPE_MS
};
#define CFG_NAME_MAX_LEN 128
typedef enum {
CFG_STYPE_DEFAULT,
CFG_STYPE_CFG_FILE,
CFG_STYPE_ENV_FILE,
CFG_STYPE_ENV_VAR,
CFG_STYPE_APOLLO_URL,
CFG_STYPE_ARG_LIST,
} ECfgSrcType;
typedef enum {
CFG_DTYPE_NONE,
CFG_DTYPE_BOOL,
CFG_DTYPE_INT32,
CFG_DTYPE_INT64,
CFG_DTYPE_FLOAT,
CFG_DTYPE_STRING,
CFG_DTYPE_DIR,
CFG_DTYPE_LOCALE,
CFG_DTYPE_CHARSET,
CFG_DTYPE_TIMEZONE
} ECfgDataType;
typedef struct SConfigItem {
ECfgSrcType stype;
ECfgDataType dtype;
bool tsc;
char *name;
union {
bool bval;
float fval;
int32_t i32;
int64_t i64;
char *str;
};
union {
int64_t imin;
double fmin;
};
union {
int64_t imax;
double fmax;
};
SArray *array; // SDiskCfg
} SConfigItem;
typedef struct {
char * option;
void * ptr;
float minValue;
float maxValue;
int8_t cfgType;
int8_t cfgStatus;
int8_t unitType;
int8_t valType;
int32_t ptrLength;
} SGlobalCfg;
extern SGlobalCfg tsGlobalConfig[];
extern int32_t tsGlobalConfigNum;
extern char * tsCfgStatusStr[];
void taosReadGlobalLogCfg();
int32_t taosReadCfgFromFile();
void taosPrintCfg();
void taosDumpGlobalCfg();
void taosAddConfigOption(SGlobalCfg cfg);
SGlobalCfg *taosGetConfigOption(const char *option);
const char *name;
const char *value;
} SConfigPair;
typedef struct SConfig SConfig;
SConfig *cfgInit();
int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const char *sourceStr);
int32_t cfgLoadArray(SConfig *pCfg, SArray *pArgs); // SConfigPair
void cfgCleanup(SConfig *pCfg);
int32_t cfgGetSize(SConfig *pCfg);
SConfigItem *cfgIterate(SConfig *pCfg, SConfigItem *pIter);
void cfgCancelIterate(SConfig *pCfg, SConfigItem *pIter);
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name);
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype);
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, bool tsc);
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, bool tsc);
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, bool tsc);
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, double minval, double maxval, bool tsc);
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, bool tsc);
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, bool tsc);
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal);
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal);
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal);
const char *cfgStypeStr(ECfgSrcType type);
const char *cfgDtypeStr(ECfgDataType type);
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_CONFIG_H*/
#endif /*_TD_CONFIG_H_*/
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#ifndef _TD_UTIL_DEF_H
#define _TD_UTIL_DEF_H
......@@ -108,35 +110,52 @@ do { \
(src) = (void *)((char *)src + sizeof(type));\
} while(0)
typedef enum EOperatorType {
// arithmetic operator
OP_TYPE_ADD = 1,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_MOD,
// bit operator
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
// comparison operator
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_IS_TRUE,
OP_TYPE_IS_FALSE,
OP_TYPE_IS_UNKNOWN,
OP_TYPE_IS_NOT_TRUE,
OP_TYPE_IS_NOT_FALSE,
OP_TYPE_IS_NOT_UNKNOWN,
// json operator
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS
} EOperatorType;
typedef enum ELogicConditionType {
LOGIC_COND_TYPE_AND,
LOGIC_COND_TYPE_OR,
LOGIC_COND_TYPE_NOT,
} ELogicConditionType;
// TODO: check if below is necessary
#define TSDB_RELATION_INVALID 0
#define TSDB_RELATION_LESS 1
#define TSDB_RELATION_GREATER 2
#define TSDB_RELATION_EQUAL 3
#define TSDB_RELATION_LESS_EQUAL 4
#define TSDB_RELATION_GREATER_EQUAL 5
#define TSDB_RELATION_NOT_EQUAL 6
#define TSDB_RELATION_LIKE 7
#define TSDB_RELATION_NOT_LIKE 8
#define TSDB_RELATION_ISNULL 9
#define TSDB_RELATION_NOTNULL 10
#define TSDB_RELATION_IN 11
#define TSDB_RELATION_NOT_IN 12
#define TSDB_RELATION_AND 13
#define TSDB_RELATION_OR 14
#define TSDB_RELATION_NOT 15
#define TSDB_RELATION_MATCH 16
#define TSDB_RELATION_NMATCH 17
#define TSDB_BINARY_OP_ADD 4000
#define TSDB_BINARY_OP_SUBTRACT 4001
#define TSDB_BINARY_OP_MULTIPLY 4002
#define TSDB_BINARY_OP_DIVIDE 4003
#define TSDB_BINARY_OP_REMAINDER 4004
#define TSDB_BINARY_OP_CONCAT 4005
#define FUNCTION_CEIL 4500
#define FUNCTION_FLOOR 4501
......@@ -148,9 +167,6 @@ do { \
#define FUNCTION_LTRIM 4802
#define FUNCTION_RTRIM 4803
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
#define TSDB_NAME_DELIMITER_LEN 1
#define TSDB_UNI_LEN 24
......@@ -182,6 +198,7 @@ do { \
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
......@@ -335,9 +352,6 @@ do { \
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
......
......@@ -30,20 +30,20 @@ void tfCleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
int64_t tfOpenRead(const char *pathname);
int64_t tfOpenReadWrite(const char *pathname);
int64_t tfOpenCreateWrite(const char *pathname);
int64_t tfOpenCreateWriteAppend(const char *pathname);
int64_t tfClose(int64_t tfd);
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset);
int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length);
void * tfMmapReadOnly(int64_t tfd, int64_t length);
// int64_t tfOpenRead(const char *pathname);
// int64_t tfOpenReadWrite(const char *pathname);
// int64_t tfOpenCreateWrite(const char *pathname);
// int64_t tfOpenCreateWriteAppend(const char *pathname);
// int64_t tfClose(int64_t tfd);
// int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
// int64_t tfRead(int64_t tfd, void *buf, int64_t count);
// int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset);
// int32_t tfFsync(int64_t tfd);
// bool tfValid(int64_t tfd);
// int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
// int32_t tfFtruncate(int64_t tfd, int64_t length);
// void * tfMmapReadOnly(int64_t tfd, int64_t length);
#ifdef __cplusplus
}
#endif
......
......@@ -22,8 +22,7 @@
extern "C" {
#endif
// log
extern int8_t tsAsyncLog;
extern bool tsAsyncLog;
extern int32_t tsNumOfLogLines;
extern int32_t tsLogKeepDays;
extern int32_t dDebugFlag;
......@@ -32,9 +31,6 @@ extern int32_t mDebugFlag;
extern int32_t cDebugFlag;
extern int32_t jniDebugFlag;
extern int32_t tmrDebugFlag;
extern int32_t httpDebugFlag;
extern int32_t mqttDebugFlag;
extern int32_t monDebugFlag;
extern int32_t uDebugFlag;
extern int32_t rpcDebugFlag;
extern int32_t qDebugFlag;
......@@ -42,23 +38,23 @@ extern int32_t wDebugFlag;
extern int32_t sDebugFlag;
extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL
#define DEBUG_WARN 2U
#define DEBUG_INFO DEBUG_WARN
#define DEBUG_DEBUG 4U
#define DEBUG_TRACE 8U
#define DEBUG_DUMP 16U
extern int32_t fsDebugFlag;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL
#define DEBUG_WARN 2U
#define DEBUG_INFO DEBUG_WARN
#define DEBUG_DEBUG 4U
#define DEBUG_TRACE 8U
#define DEBUG_DUMP 16U
#define DEBUG_SCREEN 64U
#define DEBUG_FILE 128U
#define DEBUG_FILE 128U
int32_t taosInitLog(char *logName, int32_t numOfLogLines, int32_t maxFiles);
int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog();
void taosResetLog();
void taosSetAllDebugFlag(int32_t flag);
void taosDumpData(uint8_t *msg, int32_t len);
void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...)
#ifdef __GNUC__
......@@ -72,7 +68,17 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
#endif
;
void taosDumpData(unsigned char *msg, int32_t len);
extern int8_t tscEmbeddedInUtil;
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); }
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_NOTE_H
#define _TD_UTIL_NOTE_H
#ifdef __cplusplus
extern "C" {
#endif
#define MAX_NOTE_LINE_SIZE 66000
#define NOTE_FILE_NAME_LEN 300
typedef struct {
int32_t fileNum;
int32_t maxLines;
int32_t lines;
int32_t flag;
int32_t fd;
int32_t openInProgress;
char name[NOTE_FILE_NAME_LEN];
pthread_mutex_t mutex;
} SNoteObj;
extern SNoteObj tsHttpNote;
extern SNoteObj tsTscNote;
extern SNoteObj tsInfoNote;
int32_t taosInitNotes();
void taosNotePrint(SNoteObj* pNote, const char* const format, ...);
void taosNotePrintBuffer(SNoteObj* pNote, char* buffer, int32_t len);
#define nPrintHttp(...) \
if (tsHttpEnableRecordSql) { \
taosNotePrint(&tsHttpNote, __VA_ARGS__); \
}
#define nPrintTsc(...) \
if (tsTscEnableRecordSql) { \
taosNotePrint(&tsTscNote, __VA_ARGS__); \
}
#define nInfo(buffer, len) \
if (tscEmbeddedInUtil == 1) { \
taosNotePrintBuffer(&tsInfoNote, buffer, len); \
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_NOTE_H*/
......@@ -26,7 +26,6 @@ typedef void *tmr_h;
typedef void (*TAOS_TMR_CALLBACK)(void *, void *);
extern int taosTmrThreads;
extern uint32_t tsMaxTmrCtrl;
#define MSECONDS_PER_TICK 5
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_ULOG_H
#define _TD_UTIL_ULOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tlog.h"
extern int32_t uDebugFlag;
extern int8_t tscEmbeddedInUtil;
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); }
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_ULOG_H*/
......@@ -13,17 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_LOCALE_H_
#define _TD_COMMON_LOCALE_H_
#ifndef _TD_UTIL_VERSION_H
#define _TD_UTIL_VERSION_H
#ifdef __cplusplus
extern "C" {
#endif
void tsSetLocale();
extern char version[];
extern char compatible_version[];
extern char gitinfo[];
extern char gitinfoOfInternal[];
extern char buildinfo[];
#ifdef __cplusplus
}
#endif /*_TD_COMMON_LOCALE_H_*/
#endif
#endif /*_TD_UTIL_VERSION_H*/
......@@ -20,17 +20,19 @@
extern "C" {
#endif
#include "taos.h"
#include "common.h"
#include "tmsg.h"
#include "parser.h"
#include "query.h"
#include "taos.h"
#include "tdef.h"
#include "tep.h"
#include "thash.h"
#include "tlist.h"
#include "tmsg.h"
#include "tmsgtype.h"
#include "trpc.h"
#include "query.h"
#include "parser.h"
#include "tconfig.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
......@@ -46,12 +48,12 @@ extern "C" {
typedef struct SAppInstInfo SAppInstInfo;
typedef struct SHbConnInfo {
void *param;
SClientHbReq *req;
void* param;
SClientHbReq* req;
} SHbConnInfo;
typedef struct SAppHbMgr {
char *key;
char* key;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
......@@ -62,15 +64,13 @@ typedef struct SAppHbMgr {
// connection
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
typedef struct SClientHbMgr {
int8_t inited;
......@@ -83,63 +83,62 @@ typedef struct SClientHbMgr {
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
typedef struct SQueryExecMetric {
int64_t start; // start timestamp
int64_t parsed; // start to parse
int64_t send; // start to send to server
int64_t rsp; // receive response from server
int64_t start; // start timestamp
int64_t parsed; // start to parse
int64_t send; // start to send to server
int64_t rsp; // receive response from server
} SQueryExecMetric;
typedef struct SInstanceSummary {
uint64_t numOfInsertsReq;
uint64_t numOfInsertRows;
uint64_t insertElapsedTime;
uint64_t insertBytes; // submit to tsdb since launched.
uint64_t fetchBytes;
uint64_t queryElapsedTime;
uint64_t numOfSlowQueries;
uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj
uint64_t numOfInsertsReq;
uint64_t numOfInsertRows;
uint64_t insertElapsedTime;
uint64_t insertBytes; // submit to tsdb since launched.
uint64_t fetchBytes;
uint64_t queryElapsedTime;
uint64_t numOfSlowQueries;
uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj
} SInstanceSummary;
typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode
void* pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo;
struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
SInstanceSummary summary;
SList *pConnList; // STscObj linked list
int64_t clusterId;
void *pTransporter;
struct SAppHbMgr *pAppHbMgr;
int64_t numOfConns;
SCorEpSet mgmtEp;
SInstanceSummary summary;
SList* pConnList; // STscObj linked list
int64_t clusterId;
void* pTransporter;
struct SAppHbMgr* pAppHbMgr;
};
typedef struct SAppInfo {
int64_t startTime;
char appName[TSDB_APP_NAME_LEN];
char *ep;
char* ep;
int32_t pid;
int32_t numOfThreads;
SHashObj *pInstMap;
SHashObj* pInstMap;
pthread_mutex_t mutex;
} SAppInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
int32_t acctId;
uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef
pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo *pAppInfo;
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
int32_t acctId;
uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef
pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
} STscObj;
typedef struct SMqConsumer {
......@@ -147,49 +146,49 @@ typedef struct SMqConsumer {
} SMqConsumer;
typedef struct SReqResultInfo {
const char *pRspMsg;
const char *pData;
TAOS_FIELD *fields;
uint32_t numOfCols;
int32_t *length;
TAOS_ROW row;
char **pCol;
uint32_t numOfRows;
uint64_t totalRows;
uint32_t current;
bool completed;
const char* pRspMsg;
const char* pData;
TAOS_FIELD* fields;
uint32_t numOfCols;
int32_t* length;
TAOS_ROW row;
char** pCol;
uint32_t numOfRows;
uint64_t totalRows;
uint32_t current;
bool completed;
} SReqResultInfo;
typedef struct SShowReqInfo {
int64_t execId; // showId/queryId
int32_t vgId;
SArray *pArray; // SArray<SVgroupInfo>
int32_t currentIndex; // current accessed vgroup index.
int64_t execId; // showId/queryId
int32_t vgId;
SArray* pArray; // SArray<SVgroupInfo>
int32_t currentIndex; // current accessed vgroup index.
} SShowReqInfo;
typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
tsem_t rspSem; // not used now
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
struct SQueryDag *pDag; // the query dag, generated according to the sql statement.
struct SSchJob* pQueryJob; // query job, created according to sql query DAG.
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
typedef struct SRequestObj {
uint64_t requestId;
int32_t type; // request type
STscObj *pTscObj;
char *sqlstr; // sql string
int32_t sqlLen;
int64_t self;
char *msgBuf;
void *pInfo; // sql parse info, generated by parser module
int32_t code;
SQueryExecMetric metric;
uint64_t requestId;
int32_t type; // request type
STscObj* pTscObj;
char* sqlstr; // sql string
int32_t sqlLen;
int64_t self;
char* msgBuf;
void* pInfo; // sql parse info, generated by parser module
int32_t code;
SQueryExecMetric metric;
SRequestSendRecvBody body;
} SRequestObj;
......@@ -198,51 +197,52 @@ extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool;
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int taos_init();
int taos_init();
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj);
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
uint64_t generateRequestId();
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
char *getDbOfConnection(STscObj* pObj);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str);
int taos_options_imp(TSDB_OPTION option, const char* str);
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
void *doFetchRow(SRequestObj* pRequest);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
void* doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery);
// --- heartbeat
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key);
void appHbMgrCleanup(void);
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType);
......@@ -253,7 +253,6 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq
void hbMgrInitMqHbRspHandle();
#ifdef __cplusplus
}
#endif
......
......@@ -13,33 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "ttime.h"
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
#define TSC_VAR_RELEASED 0
SAppInfo appInfo;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
SAppInfo appInfo;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) {
static void registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id);
assert(pTscObj != NULL);
......@@ -53,60 +50,49 @@ static void registerRequest(SRequestObj* pRequest) {
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self,
pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
}
}
static void deregisterRequest(SRequestObj* pRequest) {
static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj* pTscObj = pRequest->pTscObj;
SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary;
STscObj * pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampMs() - pRequest->metric.start;
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", reqId:0x%"PRIx64" elapsed:%"PRIu64" ms, current:%d, app current:%d", pRequest->self, pTscObj->id,
pRequest->requestId, duration, num, currentInst);
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration, num, currentInst);
taosReleaseRef(clientConnRefPool, pTscObj->id);
}
static void tscInitLogFile() {
taosReadGlobalLogCfg();
if (mkdir(tsLogDir, 0755) != 0 && errno != EEXIST) {
printf("failed to create log dir:%s\n", tsLogDir);
}
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
// todo close the transporter properly
void closeTransporter(STscObj* pTscObj) {
void closeTransporter(STscObj *pTscObj) {
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
return;
}
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
rpcClose(pTscObj->pAppInfo->pTransporter);
}
// TODO refactor
void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC";
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.pfp = persistConnForSpecificMsg;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
......@@ -115,7 +101,7 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.spi = 1;
rpcInit.secret = (char *)auth;
void* pDnodeConn = rpcOpen(&rpcInit);
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
return NULL;
......@@ -130,12 +116,12 @@ void destroyTscObj(void *pObj) {
SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
pthread_mutex_destroy(&pTscObj->mutex);
tfree(pTscObj);
}
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo) {
void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) {
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -153,11 +139,11 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj);
tscDebug("connObj created, 0x%"PRIx64, pObj->id);
tscDebug("connObj created, 0x%" PRIx64, pObj->id);
return pObj;
}
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) {
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
assert(pObj != NULL);
SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj));
......@@ -166,20 +152,20 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return NULL;
}
pRequest->requestId = generateRequestId();
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampMs();
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->body.fp = fp; // not used it yet
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->body.fp = fp; // not used it yet
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest);
return pRequest;
}
static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
tfree(pResInfo->pRspMsg);
tfree(pResInfo->length);
tfree(pResInfo->row);
......@@ -187,9 +173,9 @@ static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
tfree(pResInfo->fields);
}
static void doDestroyRequest(void* p) {
static void doDestroyRequest(void *p) {
assert(p != NULL);
SRequestObj* pRequest = (SRequestObj*)p;
SRequestObj *pRequest = (SRequestObj *)p;
assert(RID_VALID(pRequest->self));
......@@ -208,7 +194,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest);
}
void destroyRequest(SRequestObj* pRequest) {
void destroyRequest(SRequestObj *pRequest) {
if (pRequest == NULL) {
return;
}
......@@ -225,16 +211,17 @@ void taos_init_imp(void) {
srand(taosGetTimestampSec());
deltaToUtcInitOnce();
taosInitGlobalCfg();
taosReadCfgFromFile();
tscInitLogFile();
if (taosCheckAndPrintCfg()) {
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, 1) != 0) {
tscInitRes = -1;
return;
}
if (taosInitCfg(configDir, NULL, NULL, NULL, 1) != 0) {
tscInitRes = -1;
return;
}
taosInitNotes();
initMsgHandleFp();
initQueryModuleMsgHandle();
......@@ -245,21 +232,21 @@ void taos_init_imp(void) {
SSchedulerCfg scfg = {.maxJobNum = 100};
schedulerInit(&scfg);
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
tscDebug("starting to initialize TAOS driver");
taosSetCoreDump(true);
initTaskQueue();
clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
taosGetAppName(appInfo.appName, NULL);
pthread_mutex_init(&appInfo.mutex, NULL);
appInfo.pid = taosGetPId();
appInfo.pid = taosGetPId();
appInfo.startTime = taosGetTimestampMs();
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscDebug("client is initialized successfully");
}
......@@ -269,6 +256,7 @@ int taos_init() {
}
int taos_options_imp(TSDB_OPTION option, const char *str) {
#if 0
SGlobalCfg *cfg = NULL;
switch (option) {
......@@ -281,7 +269,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscInfo("set config file directory:%s", str);
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
......@@ -296,7 +285,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscInfo("set shellActivityTimer:%d", tsShellActivityTimer);
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
}
break;
......@@ -305,7 +295,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
assert(cfg != NULL);
size_t len = strlen(str);
if (len == 0 || len > TSDB_LOCALE_LEN) {
if (len == 0 || len > TD_LOCALE_LEN) {
tscInfo("Invalid locale:%s, use default", str);
return -1;
}
......@@ -313,8 +303,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
char sep = '.';
if (strlen(tsLocale) == 0) { // locale does not set yet
char* defaultLocale = setlocale(LC_CTYPE, "");
if (strlen(tsLocale) == 0) { // locale does not set yet
char *defaultLocale = setlocale(LC_CTYPE, "");
// The locale of the current OS does not be set correctly, so the default locale cannot be acquired.
// The launch of current system will abort soon.
......@@ -323,21 +313,21 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return -1;
}
tstrncpy(tsLocale, defaultLocale, TSDB_LOCALE_LEN);
tstrncpy(tsLocale, defaultLocale, TD_LOCALE_LEN);
}
// set the user specified locale
char *locale = setlocale(LC_CTYPE, str);
if (locale != NULL) { // failed to set the user specified locale
if (locale != NULL) { // failed to set the user specified locale
tscInfo("locale set, prev locale:%s, new locale:%s", tsLocale, locale);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
} else { // set the user specified locale failed, use default LC_CTYPE as current locale
} else { // set the user specified locale failed, use default LC_CTYPE as current locale
locale = setlocale(LC_CTYPE, tsLocale);
tscInfo("failed to set locale:%s, current locale:%s", str, tsLocale);
}
tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
tstrncpy(tsLocale, locale, TD_LOCALE_LEN);
char *charset = strrchr(tsLocale, sep);
if (charset != NULL) {
......@@ -352,7 +342,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
tscInfo("charset changed from %s to %s", tsCharset, charset);
}
tstrncpy(tsCharset, charset, TSDB_LOCALE_LEN);
tstrncpy(tsCharset, charset, TD_LOCALE_LEN);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
} else {
......@@ -360,11 +350,12 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
}
free(charset);
} else { // it may be windows system
} else { // it may be windows system
tscInfo("charset remains:%s", tsCharset);
}
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
}
......@@ -375,7 +366,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
assert(cfg != NULL);
size_t len = strlen(str);
if (len == 0 || len > TSDB_LOCALE_LEN) {
if (len == 0 || len > TD_LOCALE_LEN) {
tscInfo("failed to set charset:%s", str);
return -1;
}
......@@ -388,13 +379,14 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
tscInfo("charset changed from %s to %s", tsCharset, str);
}
tstrncpy(tsCharset, str, TSDB_LOCALE_LEN);
tstrncpy(tsCharset, str, TD_LOCALE_LEN);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
} else {
tscInfo("charset:%s not valid", str);
}
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
......@@ -405,12 +397,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
assert(cfg != NULL);
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
tstrncpy(tsTimezone, str, TSDB_TIMEZONE_LEN);
tstrncpy(tsTimezone, str, TD_TIMEZONE_LEN);
tsSetTimeZone();
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, str);
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str,
tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
......@@ -419,7 +412,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
tscError("Invalid option %d", option);
return -1;
}
#endif
return 0;
}
......@@ -434,7 +427,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*/
uint64_t generateRequestId() {
static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
static int32_t requestSerialId = 0;
if (hashId == 0) {
char uid[64] = {0};
......@@ -448,9 +441,9 @@ uint64_t generateRequestId() {
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
......
......@@ -8,7 +8,6 @@
#include "tep.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedbuf.h"
#include "tref.h"
......@@ -254,11 +253,9 @@ TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
return NULL;
}
nPrintTsc("%s", sql)
SRequestObj* pRequest = NULL;
SQueryNode* pQueryNode = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
SRequestObj* pRequest = NULL;
SQueryNode* pQueryNode = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
......@@ -370,7 +367,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest;
SConnectReq connectReq = {0};
STscObj* pObj = pRequest->pTscObj;
......@@ -398,7 +394,9 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree(pMsgBody->msgInfo.pData);
tfree(pMsgBody);
}
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
assert(pMsg->ahandle != NULL);
......
......@@ -7,6 +7,7 @@
#include "tmsg.h"
#include "tglobal.h"
#include "catalog.h"
#include "version.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......@@ -56,9 +57,7 @@ void taos_cleanup(void) {
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
int32_t p = (port != 0) ? port : tsServerPort;
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, p, user, db);
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, port, user, db);
if (user == NULL) {
user = TSDB_DEFAULT_USER;
}
......@@ -67,7 +66,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS;
}
return taos_connect_internal(ip, user, pass, NULL, db, p);
return taos_connect_internal(ip, user, pass, NULL, db, port);
}
void taos_close(TAOS* taos) {
......
......@@ -24,7 +24,6 @@
#include "tep.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedbuf.h"
#include "tref.h"
......@@ -45,22 +44,24 @@ struct tmq_topic_vgroup_list_t {
};
struct tmq_conf_t {
char clientId[256];
char groupId[256];
bool auto_commit;
char clientId[256];
char groupId[256];
int8_t auto_commit;
int8_t resetOffset;
tmq_commit_cb* commit_cb;
/*char* ip;*/
/*uint16_t port;*/
tmq_commit_cb* commit_cb;
};
struct tmq_t {
// conf
char groupId[256];
char clientId[256];
bool autoCommit;
int8_t autoCommit;
SRWLatch lock;
int64_t consumerId;
int32_t epoch;
int32_t resetOffsetCfg;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
......@@ -79,7 +80,6 @@ typedef struct {
// statistics
int64_t pollCnt;
// offset
int64_t committedOffset;
int64_t currentOffset;
// connection info
int32_t vgId;
......@@ -115,21 +115,17 @@ typedef struct {
} SMqConsumeCbParam;
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
int32_t async;
tsem_t rspSem;
} SMqCommitCbParam;
typedef struct {
tmq_t* tmq;
tmq_t* tmq;
/*SMqClientVg* pVg;*/
int32_t async;
tsem_t rspSem;
tmq_resp_err_t rspErr;
} SMqResetOffsetParam;
} SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
return conf;
}
......@@ -157,6 +153,20 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_INVALID;
}
}
if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
return TMQ_CONF_OK;
} else if (strcmp(value, "earliest") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return TMQ_CONF_OK;
} else if (strcmp(value, "latest") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
return TMQ_CONF_UNKNOWN;
}
......@@ -190,14 +200,12 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (pParam->tmq->commit_cb) {
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
}
if (!pParam->async) tsem_post(&pParam->rspSem);
return 0;
}
int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param;
pParam->rspErr = code;
tsem_post(&pParam->rspSem);
if (!pParam->async)
tsem_post(&pParam->rspSem);
else {
tsem_destroy(&pParam->rspSem);
free(param);
}
return 0;
}
......@@ -216,6 +224,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->auto_commit;
pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
......@@ -223,18 +232,40 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq;
}
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
SRequestObj* pRequest = NULL;
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
// TODO: add read write lock
SRequestObj* pRequest = NULL;
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
// build msg
// send to mnode
SMqCMResetOffsetReq req;
req.num = offsets->cnt;
req.offsets = (SMqOffset*)offsets->elems;
SMqCMCommitOffsetReq req;
SArray* pArray = NULL;
if (offsets == NULL) {
pArray = taosArrayInit(0, sizeof(SMqOffset));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
strcpy(offset.topicName, pTopic->topicName);
strcpy(offset.cgroup, tmq->groupId);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pArray, &offset);
}
}
req.num = pArray->size;
req.offsets = pArray->pData;
} else {
req.num = offsets->cnt;
req.offsets = (SMqOffset*)offsets->elems;
}
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSMqCMResetOffsetReq(&encoder, &req);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = malloc(tlen);
if (buf == NULL) {
......@@ -244,32 +275,41 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse
tCoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncodeSMqCMResetOffsetReq(&encoder, &req);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) {
tscError("failed to malloc request");
}
SMqResetOffsetParam param = {0};
tsem_init(&param.rspSem, 0, 0);
param.tmq = tmq;
SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam));
if (pParam == NULL) {
return -1;
}
pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param;
sendInfo->fp = tmqResetOffsetCb;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
if (!async) {
tsem_wait(&pParam->rspSem);
resp = pParam->rspErr;
}
return param.rspErr;
if (pArray) {
taosArrayDestroy(pArray);
}
return resp;
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
......@@ -641,8 +681,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
// clang-format off
SMqClientVg clientVg = {
.pollCnt = 0,
.committedOffset = -1,
.currentOffset = -1,
.currentOffset = pVgEp->offset,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
......@@ -708,23 +747,51 @@ END:
return 0;
}
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic,
SMqClientVg* pVg) {
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
const SMqOffset* pOffset = &offset->offset;
if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
return TMQ_RESP_ERR__FAIL;
}
int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
for (int32_t j = 0; j < vgSz; j++) {
SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
if (pVg->vgId == pOffset->vgId) {
pVg->currentOffset = pOffset->offset;
return TMQ_RESP_ERR__SUCCESS;
}
}
}
}
return TMQ_RESP_ERR__FAIL;
}
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
} else {
if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
tscError("unable to poll since no committed offset but reset offset is set to none");
return NULL;
}
reqOffset = tmq->resetOffsetCfg;
}
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
if (pReq == NULL) {
return NULL;
}
pReq->reqType = type;
strcpy(pReq->topic, pTopic->topicName);
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
strcpy(pReq->cgroup, tmq->groupId);
if (type == TMQ_REQ_TYPE_COMMIT_ONLY) {
pReq->offset = pVg->currentOffset;
} else {
pReq->offset = pVg->currentOffset + 1;
}
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
pReq->currentOffset = reqOffset;
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
......@@ -743,13 +810,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
printf("over1\n");
/*printf("over1\n");*/
usleep(blocking_time * 1000);
return NULL;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
if (taosArrayGetSize(pTopic->vgs) == 0) {
printf("over2\n");
/*printf("over2\n");*/
usleep(blocking_time * 1000);
return NULL;
}
......@@ -760,8 +827,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY;
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
if (pReq == NULL) {
ASSERT(false);
usleep(blocking_time * 1000);
......@@ -821,6 +887,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*return pRequest;*/
}
#if 0
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
if (tmq_topic_vgroup_list != NULL) {
// TODO
......@@ -831,7 +898,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
......@@ -858,6 +925,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
return 0;
}
#endif
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
......
......@@ -49,7 +49,7 @@ int main(int argc, char** argv) {
}
TEST(testCase, driverInit_Test) {
taosInitGlobalCfg();
// taosInitGlobalCfg();
// taos_init();
}
......
......@@ -34,7 +34,7 @@ int main(int argc, char** argv) {
}
TEST(testCase, driverInit_Test) {
taosInitGlobalCfg();
// taosInitGlobalCfg();
// taos_init();
}
......
......@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "ulog.h"
#include "talgo.h"
#include "tcoding.h"
#include "wchar.h"
#include "tarray.h"
#include "tlog.h"
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
#if 0
......
此差异已折叠。
此差异已折叠。
......@@ -31,7 +31,7 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
assert(!(src->lowerRelOptr == 0 && src->upperRelOptr == 0));
return pFilter;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "ulog.h"
#include "tglobal.h"
#include "tconfig.h"
#include "tutil.h"
// TODO refactor to set the tz value through parameter
void tsSetTimeZone() {
SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone");
if (cfg_timezone != NULL) {
uInfo("timezone is set to %s by %s", tsTimezone, tsCfgStatusStr[cfg_timezone->cfgStatus]);
}
#ifdef WINDOWS
char winStr[TSDB_LOCALE_LEN * 2];
sprintf(winStr, "TZ=%s", tsTimezone);
putenv(winStr);
#else
setenv("TZ", tsTimezone, 1);
#endif
tzset();
/*
* get CURRENT time zone.
* system current time zone is affected by daylight saving time(DST)
*
* e.g., the local time zone of London in DST is GMT+01:00,
* otherwise is GMT+00:00
*/
#ifdef _MSC_VER
#if _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char **tzname = _tzname;
#endif
#endif
int32_t tz = (int32_t)((-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR);
tz += daylight;
/*
* format:
* (CST, +0800)
* (BST, +0100)
*/
sprintf(tsTimezone, "(%s, %s%02d00)", tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
tsDaylight = daylight;
uInfo("timezone format changed to %s", tsTimezone);
}
此差异已折叠。
......@@ -585,7 +585,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
}
void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
if (optr == TSDB_BINARY_OP_ADD) {
if (optr == OP_TYPE_ADD) {
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)dst) = GET_INT8_VAL(s1) + GET_INT8_VAL(s2);
......
aux_source_directory(src DAEMON_SRC)
add_executable(taosd ${DAEMON_SRC})
target_link_libraries(
target_include_directories(
taosd
PUBLIC dnode
PUBLIC util
PUBLIC os
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(taosd dnode util os)
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -35,6 +35,7 @@ extern "C" {
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "tglobal.h"
#include "dnode.h"
......
此差异已折叠。
......@@ -25,11 +25,6 @@ void* serverLoop(void* param) {
SDnodeObjCfg TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SDnodeObjCfg cfg = {0};
cfg.numOfSupportVnodes = 16;
cfg.statusInterval = 1;
cfg.numOfThreadsPerCore = 1;
cfg.ratioOfQueryCores = 1;
cfg.maxShellConns = 1000;
cfg.shellActivityTimer = 30;
cfg.serverPort = port;
strcpy(cfg.dataDir, path);
snprintf(cfg.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册