提交 79596698 编写于 作者: H Haojun Liao

other: resolve conflict with 3.0 branch.

...@@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT} ...@@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
# [Optional] Uncomment this section to install additional packages. # [Optional] Uncomment this section to install additional packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here> # && apt-get -y install --no-install-recommends <your-package-list-here>
RUN apt-get update && apt-get -y install tree vim RUN apt-get update && apt-get -y install tree vim tmux
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
// Use 'forwardPorts' to make a list of ports inside the container available locally. // Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [], // "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created. // Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "gcc -v", "postCreateCommand": "wget https://raw.githubusercontent.com/hzcheng/config/master/.tmux.conf -P /root",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "root" "remoteUser": "root"
} }
\ No newline at end of file
...@@ -54,7 +54,8 @@ int32_t init_env() { ...@@ -54,7 +54,8 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)"); pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -101,7 +102,7 @@ int32_t create_topic() { ...@@ -101,7 +102,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/ /*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c4 from ct1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -144,6 +145,7 @@ void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_ ...@@ -144,6 +145,7 @@ void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -152,11 +154,15 @@ tmq_t* build_consumer() { ...@@ -152,11 +154,15 @@ tmq_t* build_consumer() {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2"); tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", "abc1");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
return tmq; return tmq;
} }
......
...@@ -247,10 +247,10 @@ DLL_EXPORT tmq_list_t *tmq_list_new(); ...@@ -247,10 +247,10 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
// will be removed in 3.0 #if 1
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif
// will replace last one
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
......
...@@ -332,6 +332,7 @@ int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); ...@@ -332,6 +332,7 @@ int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(void* buf, SEpSet* pEp); void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
typedef struct { typedef struct {
int8_t connType;
int32_t pid; int32_t pid;
char app[TSDB_APP_NAME_LEN]; char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
...@@ -346,6 +347,7 @@ typedef struct { ...@@ -346,6 +347,7 @@ typedef struct {
int64_t clusterId; int64_t clusterId;
int32_t connId; int32_t connId;
int8_t superUser; int8_t superUser;
int8_t connType;
SEpSet epSet; SEpSet epSet;
char sVersion[128]; char sVersion[128];
} SConnectRsp; } SConnectRsp;
......
...@@ -40,6 +40,7 @@ extern "C" { ...@@ -40,6 +40,7 @@ extern "C" {
* @return timestamp decided by global conf variable, tsTimePrecision * @return timestamp decided by global conf variable, tsTimePrecision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond. * if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond. * precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
* precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
*/ */
static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
if (precision == TSDB_TIME_PRECISION_MICRO) { if (precision == TSDB_TIME_PRECISION_MICRO) {
...@@ -51,6 +52,24 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { ...@@ -51,6 +52,24 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
} }
} }
/*
* @return timestamp of today at 00:00:00 in given precision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
* precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond.
*/
static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : 1000000000;
time_t t = taosTime(NULL);
struct tm * tm= taosLocalTime(&t, NULL);
tm->tm_hour = 0;
tm->tm_min = 0;
tm->tm_sec = 0;
return (int64_t)taosMktime(tm) * factor;
}
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
......
...@@ -27,9 +27,11 @@ extern "C" { ...@@ -27,9 +27,11 @@ extern "C" {
#ifndef ALLOW_FORBID_FUNC #ifndef ALLOW_FORBID_FUNC
#define strptime STRPTIME_FUNC_TAOS_FORBID #define strptime STRPTIME_FUNC_TAOS_FORBID
#define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID #define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID
#define localtime LOCALTIME_FUNC_TAOS_FORBID
#define localtime_s LOCALTIMES_FUNC_TAOS_FORBID #define localtime_s LOCALTIMES_FUNC_TAOS_FORBID
#define localtime_r LOCALTIMER_FUNC_TAOS_FORBID #define localtime_r LOCALTIMER_FUNC_TAOS_FORBID
#define time TIME_FUNC_TAOS_FORBID #define time TIME_FUNC_TAOS_FORBID
#define mktime MKTIME_FUNC_TAOS_FORBID
#endif #endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
...@@ -82,6 +84,8 @@ static FORCE_INLINE int64_t taosGetTimestampNs() { ...@@ -82,6 +84,8 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm); char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
struct tm *taosLocalTime(const time_t *timep, struct tm *result); struct tm *taosLocalTime(const time_t *timep, struct tm *result);
time_t taosTime(time_t *t);
time_t taosMktime(struct tm *timep);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -228,6 +228,12 @@ function install_header() { ...@@ -228,6 +228,12 @@ function install_header() {
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
} }
# temp install taosBenchmark
function install_taosTools() {
cd ${script_dir}/taos-tools/
tar xvf taosTools-1.4.1-Linux-x64.tar.gz && cd taosTools-1.4.1/ && ./install-taostools.sh
}
function add_newHostname_to_hosts() { function add_newHostname_to_hosts() {
localIp="127.0.0.1" localIp="127.0.0.1"
OLD_IFS="$IFS" OLD_IFS="$IFS"
...@@ -454,14 +460,14 @@ function install_service_on_systemd() { ...@@ -454,14 +460,14 @@ function install_service_on_systemd() {
} }
function install_service() { function install_service() {
if ((${service_mod}==0)); then # if ((${service_mod}==0)); then
install_service_on_systemd # install_service_on_systemd
elif ((${service_mod}==1)); then # elif ((${service_mod}==1)); then
install_service_on_sysvinit # install_service_on_sysvinit
else # else
# must manual stop taosd # # must manual stop taosd
kill_process taosd kill_process taosd
fi # fi
} }
function install_TDengine() { function install_TDengine() {
...@@ -473,6 +479,7 @@ function install_TDengine() { ...@@ -473,6 +479,7 @@ function install_TDengine() {
install_log install_log
install_header install_header
install_lib install_lib
install_taosTools
if [ -z $1 ]; then # install service and client if [ -z $1 ]; then # install service and client
# For installing new # For installing new
......
...@@ -55,6 +55,7 @@ mkdir -p ${install_dir} ...@@ -55,6 +55,7 @@ mkdir -p ${install_dir}
mkdir -p ${install_dir}/bin mkdir -p ${install_dir}/bin
mkdir -p ${install_dir}/lib mkdir -p ${install_dir}/lib
mkdir -p ${install_dir}/inc mkdir -p ${install_dir}/inc
mkdir -p ${install_dir}/taos-tools
install_files="${script_dir}/install.sh" install_files="${script_dir}/install.sh"
chmod a+x ${script_dir}/install.sh || : chmod a+x ${script_dir}/install.sh || :
...@@ -68,6 +69,8 @@ cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : ...@@ -68,6 +69,8 @@ cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
cp ${compile_dir}/source/client/libtaos.so ${install_dir}/lib/ cp ${compile_dir}/source/client/libtaos.so ${install_dir}/lib/
cp ${compile_dir}/source/libs/tdb/libtdb.so ${install_dir}/lib/ cp ${compile_dir}/source/libs/tdb/libtdb.so ${install_dir}/lib/
taostoolfile="${top_dir}/tools/taosTools-1.4.1-Linux-x64.tar.gz"
cp ${taostoolfile} ${install_dir}/taos-tools
#cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/ #cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/ #cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/
......
...@@ -45,6 +45,11 @@ extern "C" { ...@@ -45,6 +45,11 @@ extern "C" {
#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
enum {
CONN_TYPE__QUERY = 1,
CONN_TYPE__TMQ,
};
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
typedef struct { typedef struct {
...@@ -132,9 +137,9 @@ typedef struct STscObj { ...@@ -132,9 +137,9 @@ typedef struct STscObj {
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
char ver[128]; char ver[128];
int8_t connType;
int32_t acctId; int32_t acctId;
uint32_t connId; uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef uint64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
...@@ -272,7 +277,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); ...@@ -272,7 +277,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp(); void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port); uint16_t port, int connType);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
......
...@@ -23,6 +23,8 @@ static SClientHbMgr clientHbMgr = {0}; ...@@ -23,6 +23,8 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
...@@ -297,11 +299,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -297,11 +299,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
void hbMgrInitMqHbHandle() { void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
} }
...@@ -438,7 +439,7 @@ static int32_t hbCreateThread() { ...@@ -438,7 +439,7 @@ static int32_t hbCreateThread() {
if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
return 0; return 0;
} }
...@@ -568,7 +569,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -568,7 +569,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
SClientHbKey connKey = { SClientHbKey connKey = {
.connId = connId, .connId = connId,
.hbType = HEARTBEAT_TYPE_QUERY, .hbType = hbType,
}; };
SHbConnInfo info = {0}; SHbConnInfo info = {0};
...@@ -578,16 +579,14 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 ...@@ -578,16 +579,14 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
*pClusterId = clusterId; *pClusterId = clusterId;
info.param = pClusterId; info.param = pClusterId;
break; return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
} }
case HEARTBEAT_TYPE_MQ: { case HEARTBEAT_TYPE_MQ: {
break; return 0;
} }
default: default:
break; return 0;
} }
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
} }
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include "tref.h" #include "tref.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
...@@ -40,10 +40,10 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -40,10 +40,10 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
} }
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo); SAppInstInfo* pAppInfo, int connType);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port) { uint16_t port, int connType) {
if (taos_init() != TSDB_CODE_SUCCESS) { if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -111,7 +111,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, ...@@ -111,7 +111,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key); taosMemoryFreeClear(key);
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst); return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
} }
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
...@@ -418,7 +418,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe ...@@ -418,7 +418,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
} }
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo) { SAppInstInfo* pAppInfo, int connType) {
STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
if (NULL == pTscObj) { if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -432,7 +432,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -432,7 +432,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return NULL; return NULL;
} }
SMsgSendInfo* body = buildConnectMsg(pRequest); SMsgSendInfo* body = buildConnectMsg(pRequest, connType);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
...@@ -455,7 +455,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -455,7 +455,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj; return pTscObj;
} }
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) { if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -478,6 +478,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { ...@@ -478,6 +478,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
} }
taosMemoryFreeClear(db); taosMemoryFreeClear(db);
connectReq.connType = connType;
connectReq.pid = htonl(appInfo.pid); connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime); connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
...@@ -563,7 +564,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons ...@@ -563,7 +564,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return NULL; return NULL;
} }
return taos_connect_internal(ip, user, NULL, auth, db, port); return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
} }
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
...@@ -595,12 +596,15 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) { ...@@ -595,12 +596,15 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
pResultInfo->row[i] = varDataVal(pStart); pResultInfo->row[i] = varDataVal(pStart);
} else { } else {
pResultInfo->row[i] = NULL; pResultInfo->row[i] = NULL;
pResultInfo->length[i] = 0;
} }
} else { } else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) { if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current; pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
pResultInfo->length[i] = bytes;
} else { } else {
pResultInfo->row[i] = NULL; pResultInfo->row[i] = NULL;
pResultInfo->length[i] = 0;
} }
} }
} }
......
...@@ -87,7 +87,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -87,7 +87,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS; pass = TSDB_DEFAULT_PASS;
} }
return taos_connect_internal(ip, user, pass, NULL, db, port); return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
} }
void taos_close(TAOS *taos) { void taos_close(TAOS *taos) {
...@@ -124,8 +124,10 @@ const char *taos_errstr(TAOS_RES *res) { ...@@ -124,8 +124,10 @@ const char *taos_errstr(TAOS_RES *res) {
} }
void taos_free_result(TAOS_RES *res) { void taos_free_result(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res; if (TD_RES_QUERY(res)) {
destroyRequest(pRequest); SRequestObj *pRequest = (SRequestObj *)res;
destroyRequest(pRequest);
}
} }
int taos_field_count(TAOS_RES *res) { int taos_field_count(TAOS_RES *res) {
...@@ -465,26 +467,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { ...@@ -465,26 +467,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (res == NULL) { if (res == NULL) {
return 0; return 0;
} }
if (TD_RES_TMQ(res)) {
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
}
doFetchRows(pRequest, false, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
if (pResultInfo == NULL) return -1; if (pResultInfo == NULL) return -1;
...@@ -492,11 +475,24 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { ...@@ -492,11 +475,24 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
(*numOfRows) = pResultInfo->numOfRows; (*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData; (*pData) = (void *)pResultInfo->pData;
return 0; return 0;
}
} else { SRequestObj *pRequest = (SRequestObj *)res;
ASSERT(0);
return -1; if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
} }
doFetchRows(pRequest, false, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
} }
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
......
...@@ -69,9 +69,9 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -69,9 +69,9 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = connectRsp.clusterId; pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pTscObj->connType = HEARTBEAT_TYPE_QUERY; pTscObj->connType = connectRsp.connType;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, connectRsp.connType);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
...@@ -119,13 +119,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -119,13 +119,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (usedbRsp.vgVersion >= 0) { if (usedbRsp.vgVersion >= 0) {
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code)); tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code));
} else { } else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
} }
} }
tFreeSUsedbRsp(&usedbRsp); tFreeSUsedbRsp(&usedbRsp);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -139,7 +140,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -139,7 +140,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
SName name = {0}; SName name = {0};
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
SUseDbOutput output = {0}; SUseDbOutput output = {0};
code = queryBuildUseDbOutput(&output, &usedbRsp); code = queryBuildUseDbOutput(&output, &usedbRsp);
...@@ -151,11 +152,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -151,11 +152,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tscError("failed to build use db output since %s", terrstr()); tscError("failed to build use db output since %s", terrstr());
} else { } else {
struct SCatalog *pCatalog = NULL; struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code)); tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code));
} else { } else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
} }
......
...@@ -357,7 +357,15 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -357,7 +357,15 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq == NULL) { if (pTmq == NULL) {
return NULL; return NULL;
} }
pTmq->pTscObj = taos_connect(conf->ip, conf->user, conf->pass, conf->db, conf->port); const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
ASSERT(user);
ASSERT(pass);
ASSERT(conf->db);
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL;
pTmq->inWaiting = 0; pTmq->inWaiting = 0;
pTmq->status = 0; pTmq->status = 0;
...@@ -783,7 +791,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -783,7 +791,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
} }
} }
struct tm* ptm = localtime(&tt); struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
......
...@@ -459,11 +459,10 @@ TEST(testCase, create_multiple_tables) { ...@@ -459,11 +459,10 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
for (int32_t i = 0; i < 25000; ++i) { for (int32_t i = 0; i < 500; i += 2) {
char sql[512] = {0}; char sql[512] = {0};
snprintf(sql, tListLen(sql), snprintf(sql, tListLen(sql),
"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5)", i, i + 1);
(i + 1) * 30, (i + 2) * 40);
TAOS_RES* pres = taos_query(pConn, sql); TAOS_RES* pres = taos_query(pConn, sql);
if (taos_errno(pres) != 0) { if (taos_errno(pres) != 0) {
printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
...@@ -653,6 +652,7 @@ TEST(testCase, projection_query_stables) { ...@@ -653,6 +652,7 @@ TEST(testCase, projection_query_stables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif #endif
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { ...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "select length('abc') from tu"); pRes = taos_query(pConn, "select * from test_block_raw.all_type");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
...@@ -673,6 +673,10 @@ TEST(testCase, agg_query_tables) { ...@@ -673,6 +673,10 @@ TEST(testCase, agg_query_tables) {
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
int32_t n = 0;
void* data = NULL;
int32_t code = taos_fetch_raw_block(pRes, &n, &data);
char str[512] = {0}; char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes); int32_t* length = taos_fetch_lengths(pRes);
......
...@@ -1387,7 +1387,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -1387,7 +1387,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
} }
} }
struct tm* ptm = localtime(&tt); struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
......
...@@ -2532,6 +2532,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { ...@@ -2532,6 +2532,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI8(&encoder, pReq->connType) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1; if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1; if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
...@@ -2548,6 +2549,7 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { ...@@ -2548,6 +2549,7 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->connType) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
...@@ -2567,6 +2569,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -2567,6 +2569,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1; if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2585,6 +2588,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -2585,6 +2588,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -70,7 +70,7 @@ void deltaToUtcInitOnce() { ...@@ -70,7 +70,7 @@ void deltaToUtcInitOnce() {
struct tm tm = {0}; struct tm tm = {0};
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)mktime(&tm); m_deltaUtc = (int64_t)taosMktime(&tm);
// printf("====delta:%lld\n\n", seconds); // printf("====delta:%lld\n\n", seconds);
} }
...@@ -344,7 +344,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -344,7 +344,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
} }
/* mktime will be affected by TZ, set by using taos_options */ /* mktime will be affected by TZ, set by using taos_options */
int64_t seconds = mktime(&tm); int64_t seconds = taosMktime(&tm);
int64_t fraction = 0; int64_t fraction = 0;
...@@ -539,7 +539,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { ...@@ -539,7 +539,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
return (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
} }
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) {
...@@ -598,7 +598,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio ...@@ -598,7 +598,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
} }
start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); start = (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
} else { } else {
int64_t delta = t - pInterval->interval; int64_t delta = t - pInterval->interval;
int32_t factor = (delta >= 0) ? 1 : -1; int32_t factor = (delta >= 0) ? 1 : -1;
...@@ -745,7 +745,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) ...@@ -745,7 +745,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
assert(false); assert(false);
} }
ptm = localtime(&quot); ptm = taosLocalTime(&quot, NULL);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
length += snprintf(ts + length, fractionLen, format, mod); length += snprintf(ts + length, fractionLen, format, mod);
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
......
...@@ -299,7 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { ...@@ -299,7 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
goto _OVER; goto _OVER;
} }
if (vnodeInit() != 0) { if (vnodeInit(tsNumOfCommitThreads) != 0) {
dError("failed to init vnode since %s", terrstr()); dError("failed to init vnode since %s", terrstr());
goto _OVER; goto _OVER;
} }
......
...@@ -14,12 +14,13 @@ ...@@ -14,12 +14,13 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#ifndef _GRANT
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "mndGrant.h" #include "mndGrant.h"
#include "mndInt.h" #include "mndInt.h"
#ifndef _GRANT
int32_t mndInitGrant(SMnode *pMnode) { return TSDB_CODE_SUCCESS; } int32_t mndInitGrant(SMnode *pMnode) { return TSDB_CODE_SUCCESS; }
void mndCleanupGrant() {} void mndCleanupGrant() {}
void grantParseParameter() { mError("can't parsed parameter k"); } void grantParseParameter() { mError("can't parsed parameter k"); }
...@@ -30,4 +31,4 @@ void grantRestore(EGrantType grant, uint64_t value) {} ...@@ -30,4 +31,4 @@ void grantRestore(EGrantType grant, uint64_t value) {}
#endif #endif
void parseGrantParameter() { parseGrantParameter(); } void parseGrantParameter() { grantParseParameter(); }
\ No newline at end of file \ No newline at end of file
...@@ -23,13 +23,14 @@ ...@@ -23,13 +23,14 @@
#include "tglobal.h" #include "tglobal.h"
#include "version.h" #include "version.h"
#define QUERY_ID_SIZE 20 #define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18 #define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6 #define SUBQUERY_INFO_SIZE 6
#define QUERY_SAVE_SIZE 20 #define QUERY_SAVE_SIZE 20
typedef struct { typedef struct {
int32_t id; int32_t id;
int8_t connType;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time int64_t appStartTimeMs; // app start time
...@@ -44,8 +45,8 @@ typedef struct { ...@@ -44,8 +45,8 @@ typedef struct {
SQueryDesc *pQueries; SQueryDesc *pQueries;
} SConnObj; } SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
const char *app, int64_t startTime); int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn); static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
...@@ -93,8 +94,8 @@ void mndCleanupProfile(SMnode *pMnode) { ...@@ -93,8 +94,8 @@ void mndCleanupProfile(SMnode *pMnode) {
} }
} }
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
const char *app, int64_t startTime) { int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
...@@ -102,6 +103,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui ...@@ -102,6 +103,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui
if (startTime == 0) startTime = taosGetTimestampMs(); if (startTime == 0) startTime = taosGetTimestampMs();
SConnObj connObj = {.id = connId, SConnObj connObj = {.id = connId,
.connType = connType,
.appStartTimeMs = startTime, .appStartTimeMs = startTime,
.pid = pid, .pid = pid,
.ip = ip, .ip = ip,
...@@ -159,8 +161,8 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { ...@@ -159,8 +161,8 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
} }
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) { void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
SConnObj* pConn = NULL; SConnObj *pConn = NULL;
bool hasNext = taosCacheIterNext(pIter); bool hasNext = taosCacheIterNext(pIter);
if (hasNext) { if (hasNext) {
size_t dataLen = 0; size_t dataLen = 0;
pConn = taosCacheIterGetData(pIter, &dataLen); pConn = taosCacheIterGetData(pIter, &dataLen);
...@@ -210,8 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -210,8 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
} }
} }
pConn = pConn = mndCreateConn(pMnode, pReq->user, connReq.connType, pReq->clientIp, pReq->clientPort, connReq.pid,
mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime); connReq.app, connReq.startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER; goto CONN_OVER;
...@@ -222,6 +224,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -222,6 +224,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
connectRsp.superUser = pUser->superUser; connectRsp.superUser = pUser->superUser;
connectRsp.clusterId = pMnode->clusterId; connectRsp.clusterId = pMnode->clusterId;
connectRsp.connId = pConn->id; connectRsp.connId = pConn->id;
connectRsp.connType = connReq.connType;
snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
gitinfo); gitinfo);
...@@ -343,7 +346,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { ...@@ -343,7 +346,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
return -1; return -1;
} }
SClientHbBatchRsp batchRsp = {0}; SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
...@@ -916,4 +918,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { ...@@ -916,4 +918,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
int32_t mndGetNumOfConnections(SMnode *pMnode) { int32_t mndGetNumOfConnections(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
return taosCacheGetNumOfObj(pMgmt->cache); return taosCacheGetNumOfObj(pMgmt->cache);
} }
\ No newline at end of file
...@@ -652,24 +652,26 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB ...@@ -652,24 +652,26 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
cols = 0; cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
colDataAppend(pColInfo, numOfRows, (const char*) name, false); colDataAppend(pColInfo, numOfRows, (const char*) name, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
const char* src = pUser->superUser? "super":"normal"; const char* src = pUser->superUser? "super":"normal";
char b[10+VARSTR_HEADER_SIZE] = {0}; char b[10+VARSTR_HEADER_SIZE] = {0};
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src)); STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
colDataAppend(pColInfo, numOfRows, (const char*) b, false); colDataAppend(pColInfo, numOfRows, (const char*) b, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false); colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols); pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
colDataAppend(pColInfo, numOfRows, (const char*) name, false); colDataAppend(pColInfo, numOfRows, (const char*) name, false);
......
...@@ -10,21 +10,17 @@ target_sources( ...@@ -10,21 +10,17 @@ target_sources(
"src/vnd/vnodeCommit.c" "src/vnd/vnodeCommit.c"
"src/vnd/vnodeInt.c" "src/vnd/vnodeInt.c"
"src/vnd/vnodeMain.c" "src/vnd/vnodeMain.c"
"src/vnd/vnodeMgr.c"
"src/vnd/vnodeQuery.c" "src/vnd/vnodeQuery.c"
"src/vnd/vnodeStateMgr.c" "src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeWrite.c" "src/vnd/vnodeWrite.c"
"src/vnd/vnodeModule.c"
# "src/vnd/vnodeMgr.c"
# meta # meta
# "src/meta/metaBDBImpl.c" # "src/meta/metaBDBImpl.c"
"src/meta/metaCache.c"
"src/meta/metaCfg.c"
"src/meta/metaIdx.c" "src/meta/metaIdx.c"
"src/meta/metaMain.c" "src/meta/metaMain.c"
"src/meta/metaQuery.c"
"src/meta/metaTable.c" "src/meta/metaTable.c"
"src/meta/metaTbCfg.c"
"src/meta/metaTbTag.c"
"src/meta/metaTbUid.c" "src/meta/metaTbUid.c"
"src/meta/metaTDBImpl.c" "src/meta/metaTDBImpl.c"
......
...@@ -60,34 +60,15 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); ...@@ -60,34 +60,15 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
typedef struct SMTbCursor SMTbCursor; // todo: remove typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor; // todo: remove
typedef struct SMSmaCursor SMSmaCursor; // todo: remove
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg; typedef SVCreateTSmaReq SSmaCfg;
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); void metaCloseTbCursor(SMTbCursor *pTbCur);
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); char *metaTbCursorNext(SMTbCursor *pTbCur);
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int metaGetTbNum(SMeta *pMeta);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
// tsdb // tsdb
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
...@@ -98,18 +79,7 @@ typedef void *tsdbReaderT; ...@@ -98,18 +79,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
...@@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn ...@@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// tq // tq
enum {
TQ_STREAM_TOKEN__DATA = 1,
TQ_STREAM_TOKEN__WATERMARK,
TQ_STREAM_TOKEN__CHECKPOINT,
};
typedef struct STqReadHandle STqReadHandle; typedef struct STqReadHandle STqReadHandle;
...@@ -153,9 +113,6 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn ...@@ -153,9 +113,6 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
// need to reposition // need to reposition
typedef struct SMgmtWrapper SMgmtWrapper;
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
// structs // structs
struct SMetaCfg { struct SMetaCfg {
...@@ -202,21 +159,6 @@ struct SVnodeCfg { ...@@ -202,21 +159,6 @@ struct SVnodeCfg {
int8_t hashMethod; int8_t hashMethod;
}; };
struct STqReadHandle {
int64_t ver;
int64_t tbUid;
SHashObj *tbIdHash;
const SSubmitReq *pMsg;
SSubmitBlk *pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta *pVnodeMeta;
SArray *pColIdList; // SArray<int32_t>
int32_t sver;
SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema;
};
struct SDataStatis { struct SDataStatis {
int16_t colId; int16_t colId;
int16_t maxIndex; int16_t maxIndex;
...@@ -241,22 +183,6 @@ typedef struct { ...@@ -241,22 +183,6 @@ typedef struct {
uint64_t uid; uint64_t uid;
} STableKeyInfo; } STableKeyInfo;
typedef struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void *data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,20 +20,48 @@ ...@@ -20,20 +20,48 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SMetaCache SMetaCache; typedef struct SMetaCache SMetaCache;
typedef struct SMetaIdx SMetaIdx; typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB; typedef struct SMetaDB SMetaDB;
typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMSmaCursor SMSmaCursor;
SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); // metaDebug ==================
void metaClose(SMeta* pMeta); // clang-format off
void metaRemove(const char* path); #define metaFatal(...) do { if (metaDebugFlag & DEBUG_FATAL) { taosPrintLog("META FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); #define metaError(...) do { if (metaDebugFlag & DEBUG_ERROR) { taosPrintLog("META ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
int metaDropTable(SMeta* pMeta, tb_uid_t uid); #define metaWarn(...) do { if (metaDebugFlag & DEBUG_WARN) { taosPrintLog("META WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
int metaCommit(SMeta* pMeta); #define metaInfo(...) do { if (metaDebugFlag & DEBUG_INFO) { taosPrintLog("META ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); #define metaDebug(...) do { if (metaDebugFlag & DEBUG_DEBUG) { taosPrintLog("META ", DEBUG_DEBUG, metaDebugFlag, __VA_ARGS__); }} while(0)
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); #define metaTrace(...) do { if (metaDebugFlag & DEBUG_TRACE) { taosPrintLog("META ", DEBUG_TRACE, metaDebugFlag, __VA_ARGS__); }} while(0)
STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid); // clang-format on
STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid);
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF);
void metaClose(SMeta* pMeta);
void metaRemove(const char* path);
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
int metaDropTable(SMeta* pMeta, tb_uid_t uid);
int metaCommit(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid);
STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup);
int metaGetTbNum(SMeta* pMeta);
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
// SMetaDB // SMetaDB
int metaOpenDB(SMeta* pMeta); int metaOpenDB(SMeta* pMeta);
...@@ -47,11 +75,6 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); ...@@ -47,11 +75,6 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
int metaOpenCache(SMeta* pMeta); int metaOpenCache(SMeta* pMeta);
void metaCloseCache(SMeta* pMeta); void metaCloseCache(SMeta* pMeta);
// SMetaCfg
extern const SMetaCfg defaultMetaOptions;
// int metaValidateOptions(const SMetaCfg*);
void metaOptionsCopy(SMetaCfg* pDest, const SMetaCfg* pSrc);
// SMetaIdx // SMetaIdx
int metaOpenIdx(SMeta* pMeta); int metaOpenIdx(SMeta* pMeta);
void metaCloseIdx(SMeta* pMeta); void metaCloseIdx(SMeta* pMeta);
......
...@@ -20,48 +20,21 @@ ...@@ -20,48 +20,21 @@
extern "C" { extern "C" {
#endif #endif
// tqInt.h // tqDebug ===================
#define tqFatal(...) \ // clang-format off
{ \ #define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (tqDebugFlag & DEBUG_FATAL) { \ #define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ #define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \ #define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
} #define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqError(...) \ // clang-format on
{ \
if (tqDebugFlag & DEBUG_ERROR) { \ enum {
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ TQ_STREAM_TOKEN__DATA = 1,
} \ TQ_STREAM_TOKEN__WATERMARK,
} TQ_STREAM_TOKEN__CHECKPOINT,
};
#define tqWarn(...) \
{ \
if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define tqInfo(...) \
{ \
if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define tqDebug(...) \
{ \
if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
} \
}
#define tqTrace(...) \
{ \
if (tqDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \
} \
}
#define TQ_BUFFER_SIZE 4 #define TQ_BUFFER_SIZE 4
...@@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; ...@@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
typedef struct STqOffsetCfg STqOffsetCfg; typedef struct STqOffsetCfg STqOffsetCfg;
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
struct STqReadHandle {
int64_t ver;
int64_t tbUid;
SHashObj* tbIdHash;
const SSubmitReq* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pVnodeMeta;
SArray* pColIdList; // SArray<int32_t>
int32_t sver;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
};
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void* data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
typedef struct { typedef struct {
int16_t ver; int16_t ver;
int16_t action; int16_t action;
...@@ -248,6 +246,25 @@ typedef struct { ...@@ -248,6 +246,25 @@ typedef struct {
static STqPushMgmt tqPushMgmt; static STqPushMgmt tqPushMgmt;
// init once
int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
SMemAllocatorFactory* allocFac);
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VND_H_
#define _TD_VND_H_
#ifdef __cplusplus
extern "C" {
#endif
// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// vnodeModule ====================
int vnodeScheduleTask(int (*execute)(void*), void* arg);
// vnodeQuery ====================
int vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
#if 1
// SVBufPool
int vnodeOpenBufPool(SVnode* pVnode);
void vnodeCloseBufPool(SVnode* pVnode);
int vnodeBufPoolSwitch(SVnode* pVnode);
int vnodeBufPoolRecycle(SVnode* pVnode);
void* vnodeMalloc(SVnode* pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode* pVnode);
SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode);
// SVMemAllocator
typedef struct SVArenaNode {
TD_SLIST_NODE(SVArenaNode);
uint64_t size; // current node size
void* ptr;
char data[];
} SVArenaNode;
typedef struct SVMemAllocator {
T_REF_DECLARE()
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
uint64_t lsize;
SVArenaNode* pNode;
TD_SLIST(SVArenaNode) nlist;
} SVMemAllocator;
SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize);
void vmaDestroy(SVMemAllocator* pVMA);
void vmaReset(SVMemAllocator* pVMA);
void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
// vnodeCfg.h
extern const SVnodeCfg defaultVnodeOptions;
int vnodeValidateOptions(const SVnodeCfg*);
void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_VND_H_*/
\ No newline at end of file
...@@ -35,36 +35,21 @@ ...@@ -35,36 +35,21 @@
#include "tstream.h" #include "tstream.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "vnode.h"
#include "wal.h" #include "wal.h"
#include "vnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct STQ STQ; typedef struct SMeta SMeta;
typedef struct STsdb STsdb;
typedef struct STQ STQ;
typedef struct SVState SVState; typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorkerMgmt SQHandle; typedef struct SQWorkerMgmt SQHandle;
typedef struct SVnodeTask {
TD_DLIST_NODE(SVnodeTask);
void* arg;
int (*execute)(void*);
} SVnodeTask;
typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag;
// For commit
bool stop;
uint16_t nthreads;
TdThread* threads;
TdThreadMutex mutex;
TdThreadCond hasTask;
TD_DLIST(SVnodeTask) queue;
} SVnodeMgr;
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
int8_t dstType; int8_t dstType;
...@@ -80,8 +65,6 @@ typedef struct { ...@@ -80,8 +65,6 @@ typedef struct {
SHashObj* pHash; // streamId -> SStreamSinkInfo SHashObj* pHash; // streamId -> SStreamSinkInfo
} SSink; } SSink;
extern SVnodeMgr vnodeMgr;
// SVState // SVState
struct SVState { struct SVState {
int64_t processed; int64_t processed;
...@@ -106,118 +89,11 @@ struct SVnode { ...@@ -106,118 +89,11 @@ struct SVnode {
STfs* pTfs; STfs* pTfs;
}; };
int vnodeScheduleTask(SVnodeTask* task);
int vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
#define vFatal(...) \
do { \
if (vDebugFlag & DEBUG_FATAL) { \
taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
} while (0)
#define vError(...) \
do { \
if (vDebugFlag & DEBUG_ERROR) { \
taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
} while (0)
#define vWarn(...) \
do { \
if (vDebugFlag & DEBUG_WARN) { \
taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
} while (0)
#define vInfo(...) \
do { \
if (vDebugFlag & DEBUG_INFO) { \
taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
} while (0)
#define vDebug(...) \
do { \
if (vDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("VND ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define vTrace(...) \
do { \
if (vDebugFlag & DEBUG_TRACE) { \
taosPrintLog("VND ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); \
} \
} while (0)
// vnodeCfg.h
extern const SVnodeCfg defaultVnodeOptions;
int vnodeValidateOptions(const SVnodeCfg*);
void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
// SVBufPool
int vnodeOpenBufPool(SVnode* pVnode);
void vnodeCloseBufPool(SVnode* pVnode);
int vnodeBufPoolSwitch(SVnode* pVnode);
int vnodeBufPoolRecycle(SVnode* pVnode);
void* vnodeMalloc(SVnode* pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode* pVnode);
SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode);
// SVMemAllocator
typedef struct SVArenaNode {
TD_SLIST_NODE(SVArenaNode);
uint64_t size; // current node size
void* ptr;
char data[];
} SVArenaNode;
typedef struct SVMemAllocator {
T_REF_DECLARE()
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
uint64_t lsize;
SVArenaNode* pNode;
TD_SLIST(SVArenaNode) nlist;
} SVMemAllocator;
SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize);
void vmaDestroy(SVMemAllocator* pVMA);
void vmaReset(SVMemAllocator* pVMA);
void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
// init once
int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
SMemAllocatorFactory* allocFac);
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
#include "vnd.h"
#include "meta.h" #include "meta.h"
#include "tsdb.h" #include "tsdb.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
struct SMetaCache {
// TODO
};
int metaOpenCache(SMeta *pMeta) {
// TODO
// if (pMeta->options.lruSize) {
// pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
// if (pMeta->pCache == NULL) {
// // TODO: handle error
// return -1;
// }
// }
return 0;
}
void metaCloseCache(SMeta *pMeta) {
// if (pMeta->pCache) {
// rocksdb_cache_destroy(pMeta->pCache);
// pMeta->pCache = NULL;
// }
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
const SMetaCfg defaultMetaOptions = {.lruSize = 0};
/* ------------------------ EXPOSED METHODS ------------------------ */
int metaValidateOptions(const SMetaCfg *pMetaOptions) {
// TODO
return 0;
}
void metaOptionsCopy(SMetaCfg *pDest, const SMetaCfg *pSrc) { memcpy(pDest, pSrc, sizeof(*pSrc)); }
/* ------------------------ STATIC METHODS ------------------------ */
\ No newline at end of file
...@@ -25,17 +25,6 @@ static void metaCloseImpl(SMeta *pMeta); ...@@ -25,17 +25,6 @@ static void metaCloseImpl(SMeta *pMeta);
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
// Set default options
if (pMetaCfg == NULL) {
pMetaCfg = &defaultMetaOptions;
}
// // Validate the options
// if (metaValidateOptions(pMetaCfg) < 0) {
// // TODO: deal with error
// return NULL;
// }
// Allocate handle // Allocate handle
pMeta = metaNew(path, pMetaCfg, pMAF); pMeta = metaNew(path, pMetaCfg, pMAF);
if (pMeta == NULL) { if (pMeta == NULL) {
...@@ -80,9 +69,6 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorF ...@@ -80,9 +69,6 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorF
return NULL; return NULL;
} }
metaOptionsCopy(&(pMeta->options), pMetaCfg);
pMeta->pmaf = pMAF;
return pMeta; return pMeta;
}; };
...@@ -94,13 +80,6 @@ static void metaFree(SMeta *pMeta) { ...@@ -94,13 +80,6 @@ static void metaFree(SMeta *pMeta) {
} }
static int metaOpenImpl(SMeta *pMeta) { static int metaOpenImpl(SMeta *pMeta) {
// Open meta cache
if (metaOpenCache(pMeta) < 0) {
// TODO: handle error
metaCloseImpl(pMeta);
return -1;
}
// Open meta db // Open meta db
if (metaOpenDB(pMeta) < 0) { if (metaOpenDB(pMeta) < 0) {
// TODO: handle error // TODO: handle error
...@@ -129,5 +108,4 @@ static void metaCloseImpl(SMeta *pMeta) { ...@@ -129,5 +108,4 @@ static void metaCloseImpl(SMeta *pMeta) {
metaCloseUidGnrt(pMeta); metaCloseUidGnrt(pMeta);
metaCloseIdx(pMeta); metaCloseIdx(pMeta);
metaCloseDB(pMeta); metaCloseDB(pMeta);
metaCloseCache(pMeta);
} }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
int metaValidateTbCfg(SMeta *pMeta, const STbCfg *pTbOptions) {
// TODO
return 0;
}
size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t bsize) {
void **ppBuf = &pBuf;
int tlen = 0;
tlen += taosEncodeFixedU8(ppBuf, pTbOptions->type);
tlen += taosEncodeString(ppBuf, pTbOptions->name);
tlen += taosEncodeFixedU32(ppBuf, pTbOptions->ttl);
switch (pTbOptions->type) {
case META_SUPER_TABLE:
tlen += taosEncodeFixedU64(ppBuf, pTbOptions->stbCfg.suid);
tlen += tdEncodeSchema(ppBuf, (STSchema *)pTbOptions->stbCfg.pTagSchema);
// TODO: encode schema version array
break;
case META_CHILD_TABLE:
tlen += taosEncodeFixedU64(ppBuf, pTbOptions->ctbCfg.suid);
break;
case META_NORMAL_TABLE:
// TODO: encode schema version array
break;
default:
break;
}
return tlen;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
\ No newline at end of file
...@@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit(pVnode); vnodeWaitCommit(pVnode);
vnodeBufPoolSwitch(pVnode); vnodeBufPoolSwitch(pVnode);
SVnodeTask *pTask = (SVnodeTask *)taosMemoryMalloc(sizeof(*pTask));
pTask->execute = vnodeCommit; // TODO
pTask->arg = pVnode; // TODO
tsdbPrepareCommit(pVnode->pTsdb); tsdbPrepareCommit(pVnode->pTsdb);
// metaPrepareCommit(pVnode->pMeta);
// walPreapareCommit(pVnode->pWal);
vnodeScheduleTask(pTask); vnodeScheduleTask(vnodeCommit, pVnode);
return 0; return 0;
} }
......
...@@ -15,31 +15,55 @@ ...@@ -15,31 +15,55 @@
#include "vnodeInt.h" #include "vnodeInt.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; typedef struct SVnodeTask SVnodeTask;
struct SVnodeTask {
SVnodeTask* next;
SVnodeTask* prev;
int (*execute)(void*);
void* arg;
};
struct SVnodeGlobal {
int8_t init;
int8_t stop;
int nthreads;
TdThread* threads;
TdThreadMutex mutex;
TdThreadCond hasTask;
SVnodeTask queue;
};
struct SVnodeGlobal vnodeGlobal;
static void* loop(void* arg); static void* loop(void* arg);
int vnodeInit() { int vnodeInit(int nthreads) {
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { int8_t init;
int ret;
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1);
if (init) {
return 0; return 0;
} }
vnodeMgr.stop = false; vnodeGlobal.stop = 0;
vnodeGlobal.queue.next = &vnodeGlobal.queue;
vnodeGlobal.queue.prev = &vnodeGlobal.queue;
// Start commit handers vnodeGlobal.nthreads = nthreads;
vnodeMgr.nthreads = tsNumOfCommitThreads; vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
vnodeMgr.threads = taosMemoryCalloc(vnodeMgr.nthreads, sizeof(TdThread)); if (vnodeGlobal.threads == NULL) {
if (vnodeMgr.threads == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("failed to init vnode module since: %s", tstrerror(terrno));
return -1; return -1;
} }
taosThreadMutexInit(&(vnodeMgr.mutex), NULL); taosThreadMutexInit(&vnodeGlobal.mutex, NULL);
taosThreadCondInit(&(vnodeMgr.hasTask), NULL); taosThreadCondInit(&vnodeGlobal.hasTask, NULL);
TD_DLIST_INIT(&(vnodeMgr.queue));
for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) { for (int i = 0; i < nthreads; i++) {
taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL); taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL);
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
} }
if (walInit() < 0) { if (walInit() < 0) {
...@@ -50,62 +74,83 @@ int vnodeInit() { ...@@ -50,62 +74,83 @@ int vnodeInit() {
} }
void vnodeCleanup() { void vnodeCleanup() {
if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_UNINITIALIZED) { int8_t init;
return;
}
// Stop commit handler init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0);
taosThreadMutexLock(&(vnodeMgr.mutex)); if (init == 0) return;
vnodeMgr.stop = true;
taosThreadCondBroadcast(&(vnodeMgr.hasTask));
taosThreadMutexUnlock(&(vnodeMgr.mutex));
for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) { // set stop
taosThreadJoin(vnodeMgr.threads[i], NULL); taosThreadMutexLock(&(vnodeGlobal.mutex));
vnodeGlobal.stop = 1;
taosThreadCondBroadcast(&(vnodeGlobal.hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
// wait for threads
for (int i = 0; i < vnodeGlobal.nthreads; i++) {
taosThreadJoin(vnodeGlobal.threads[i], NULL);
} }
taosMemoryFreeClear(vnodeMgr.threads); // clear source
taosThreadCondDestroy(&(vnodeMgr.hasTask)); taosMemoryFreeClear(vnodeGlobal.threads);
taosThreadMutexDestroy(&(vnodeMgr.mutex)); taosThreadCondDestroy(&(vnodeGlobal.hasTask));
taosThreadMutexDestroy(&(vnodeGlobal.mutex));
} }
int vnodeScheduleTask(SVnodeTask* pTask) { int vnodeScheduleTask(int (*execute)(void*), void* arg) {
taosThreadMutexLock(&(vnodeMgr.mutex)); SVnodeTask* pTask;
TD_DLIST_APPEND(&(vnodeMgr.queue), pTask); ASSERT(!vnodeGlobal.stop);
taosThreadCondSignal(&(vnodeMgr.hasTask)); pTask = taosMemoryMalloc(sizeof(*pTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTask->execute = execute;
pTask->arg = arg;
taosThreadMutexUnlock(&(vnodeMgr.mutex)); taosThreadMutexLock(&(vnodeGlobal.mutex));
pTask->next = &vnodeGlobal.queue;
pTask->prev = vnodeGlobal.queue.prev;
vnodeGlobal.queue.prev->next = pTask;
vnodeGlobal.queue.prev = pTask;
taosThreadCondSignal(&(vnodeGlobal.hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
return 0; return 0;
} }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) { static void* loop(void* arg) {
SVnodeTask* pTask;
int ret;
setThreadName("vnode-commit"); setThreadName("vnode-commit");
SVnodeTask* pTask;
for (;;) { for (;;) {
taosThreadMutexLock(&(vnodeMgr.mutex)); taosThreadMutexLock(&(vnodeGlobal.mutex));
for (;;) { for (;;) {
pTask = TD_DLIST_HEAD(&(vnodeMgr.queue)); pTask = vnodeGlobal.queue.next;
if (pTask == NULL) { if (pTask == &vnodeGlobal.queue) {
if (vnodeMgr.stop) { // no task
taosThreadMutexUnlock(&(vnodeMgr.mutex)); if (vnodeGlobal.stop) {
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
return NULL; return NULL;
} else { } else {
taosThreadCondWait(&(vnodeMgr.hasTask), &(vnodeMgr.mutex)); taosThreadCondWait(&(vnodeGlobal.hasTask), &(vnodeGlobal.mutex));
} }
} else { } else {
TD_DLIST_POP(&(vnodeMgr.queue), pTask); // has task
pTask->prev->next = pTask->next;
pTask->next->prev = pTask->prev;
break; break;
} }
} }
taosThreadMutexUnlock(&(vnodeMgr.mutex)); taosThreadMutexUnlock(&(vnodeGlobal.mutex));
(*(pTask->execute))(pTask->arg); pTask->execute(pTask->arg);
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
......
...@@ -173,12 +173,12 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o ...@@ -173,12 +173,12 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor); int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
mon = (int)(mon + interval); mon = (int)(mon + interval);
tm.tm_year = mon / 12; tm.tm_year = mon / 12;
tm.tm_mon = mon % 12; tm.tm_mon = mon % 12;
tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision);
tw->ekey -= 1; tw->ekey -= 1;
} }
......
...@@ -312,6 +312,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time ...@@ -312,6 +312,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
if (pToken->type == TK_NOW) { if (pToken->type == TK_NOW) {
ts = taosGetTimestamp(timePrec); ts = taosGetTimestamp(timePrec);
} else if (pToken->type == TK_TODAY) {
ts = taosGetTimestampToday(timePrec);
} else if (pToken->type == TK_NK_INTEGER) { } else if (pToken->type == TK_NK_INTEGER) {
bool isSigned = false; bool isSigned = false;
toInteger(pToken->z, pToken->n, 10, &ts, &isSigned); toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
...@@ -376,8 +378,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time ...@@ -376,8 +378,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
} }
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
if ((pToken->type != TK_NOW && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL && if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT &&
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) || pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) ||
(pToken->n == 0) || (pToken->type == TK_NK_RP)) { (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
} }
......
...@@ -331,6 +331,19 @@ static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCo ...@@ -331,6 +331,19 @@ static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t getNumOfNullEntries(SColumnInfoData *pColumnInfoData, int32_t numOfRows) {
int32_t numOfNulls = 0;
if (!pColumnInfoData->hasNull) {
return numOfNulls;
}
for (int i = 0; i < numOfRows; ++i) {
if (pColumnInfoData->varmeta.offset[i] == -1) {
numOfNulls++;
}
}
return numOfNulls;
}
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -363,10 +376,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -363,10 +376,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
factor = TSDB_NCHAR_SIZE; factor = TSDB_NCHAR_SIZE;
} }
int32_t numOfNulls = getNumOfNullEntries(pInputData[i], pInput[i].numOfRows);
if (pInput[i].numOfRows == 1) { if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * numOfRows; inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * (numOfRows - numOfNulls);
} else { } else {
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; inputLen += pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE;
} }
} }
...@@ -444,13 +459,15 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -444,13 +459,15 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
factor = TSDB_NCHAR_SIZE; factor = TSDB_NCHAR_SIZE;
} }
int32_t numOfNulls = getNumOfNullEntries(pInputData[i], pInput[i].numOfRows);
if (i == 0) { if (i == 0) {
// calculate required separator space // calculate required separator space
inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor; inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * (numOfRows - numOfNulls) * (inputNum - 2) * factor;
} else if (pInput[i].numOfRows == 1) { } else if (pInput[i].numOfRows == 1) {
inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * factor; inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * (numOfRows - numOfNulls) * factor;
} else { } else {
inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE; inputLen += pInputData[i]->varmeta.length - (numOfRows - numOfNulls) * VARSTR_HEADER_SIZE;
} }
} }
...@@ -841,7 +858,7 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * ...@@ -841,7 +858,7 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
} }
struct tm *tmInfo = localtime((const time_t *)&timeVal); struct tm *tmInfo = taosLocalTime((const time_t *)&timeVal, NULL);
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S%z", tmInfo); strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S%z", tmInfo);
int32_t len = (int32_t)strlen(buf); int32_t len = (int32_t)strlen(buf);
......
...@@ -406,7 +406,18 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { ...@@ -406,7 +406,18 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
#endif #endif
} }
time_t taosTime(time_t *t) {
return time(t);
}
time_t taosMktime(struct tm *timep) {
return mktime(timep);
}
struct tm *taosLocalTime(const time_t *timep, struct tm *result) { struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
if (result == NULL) {
return localtime(timep);
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
localtime_s(result, timep); localtime_s(result, timep);
#else #else
......
...@@ -56,8 +56,8 @@ ...@@ -56,8 +56,8 @@
# ---- tmq # ---- tmq
./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic.sim
./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/basic1.sim
./test.sh -f tsim/tmq/oneTopic.sim #./test.sh -f tsim/tmq/oneTopic.sim
./test.sh -f tsim/tmq/multiTopic.sim #./test.sh -f tsim/tmq/multiTopic.sim
#./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim #./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim
#./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim #./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim
......
...@@ -83,6 +83,14 @@ sql insert into ntb5 values ("2022-01-01 00:00:00.000" , "0123456789" , "0123456 ...@@ -83,6 +83,14 @@ sql insert into ntb5 values ("2022-01-01 00:00:00.000" , "0123456789" , "0123456
sql insert into ctb5 values ("2022-01-01 00:00:00.001" , NULL , NULL ) sql insert into ctb5 values ("2022-01-01 00:00:00.001" , NULL , NULL )
sql insert into ntb5 values ("2022-01-01 00:00:00.001" , NULL , NULL ) sql insert into ntb5 values ("2022-01-01 00:00:00.001" , NULL , NULL )
sql create table stb3 (ts timestamp, c1 binary(64), c2 nchar(64), c3 nchar(64) ) tags (t1 nchar(64))
sql create table ctb6 using stb3 tags("tag-nchar-6")
sql create table ntb6 (ts timestamp, c1 binary(64), c2 nchar(64), c3 nchar(64) )
sql insert into ctb6 values ("2022-01-01 00:00:00.000" , "0123456789" , "中文测试1" , "中文测试2" )
sql insert into ntb6 values ("2022-01-01 00:00:00.000" , "0123456789" , "中文测试01", "中文测试01" )
sql insert into ctb6 values ("2022-01-01 00:00:00.001" , NULL , NULL, NULL )
sql insert into ntb6 values ("2022-01-01 00:00:00.001" , NULL , NULL, NULL )
$loop_test = 0 $loop_test = 0
loop_test_pos: loop_test_pos:
...@@ -150,6 +158,211 @@ if $data01 != 12 then ...@@ -150,6 +158,211 @@ if $data01 != 12 then
return -1 return -1
endi endi
print ====> select c2 ,length(c2), char_length(c2) from ctb6
sql select c2 ,length(c2), char_length(c2) from ctb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data01 != 20 then
return -1
endi
if $data02 != 5 then
return -1
endi
if $data11 != NULL then
return -1
endi
print ====> select c2 ,length(c2),char_length(c2) from ntb6
sql select c2 ,length(c2),char_length(c2) from ntb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data01 != 24 then
return -1
endi
if $data02 != 6 then
return -1
endi
if $data11 != NULL then
return -1
endi
print ====> select c2 ,lower(c2), upper(c2) from ctb6
sql select c2 ,lower(c2), upper(c2) from ctb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data01 != 中文测试1 then
return -1
endi
if $data02 != 中文测试1 then
return -1
endi
if $data11 != NULL then
return -1
endi
print ====> select c2 ,lower(c2), upper(c2) from ntb6
sql select c2 ,lower(c2), upper(c2) from ntb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data01 != 中文测试01 then
return -1
endi
if $data02 != 中文测试01 then
return -1
endi
if $data11 != NULL then
return -1
endi
print ====> select c2, ltrim(c2), ltrim(c2) from ctb6
sql select c2, ltrim(c2), ltrim(c2) from ctb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data01 != 中文测试1 then
return -1
endi
if $data02 != 中文测试1 then
return -1
endi
if $data11 != NULL then
return -1
endi
print ====> select c2, ltrim(c2), ltrim(c2) from ntb6
sql select c2, ltrim(c2), ltrim(c2) from ntb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data01 != 中文测试01 then
return -1
endi
if $data02 != 中文测试01 then
return -1
endi
if $data11 != NULL then
return -1
endi
print ====> select c2, c3 , concat(c2,c3) from ctb6
sql select c2, c3 , concat(c2,c3) from ctb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data02 != 中文测试1中文测试2 then
return -1
endi
if $data12 != NULL then
return -1
endi
return
print ====> select c2, c3 , concat(c2,c3) from ntb6
sql select c2, c3 , concat(c2,c3) from ntb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data02 != 中文测试01中文测试01 then
return -1
endi
if $data12 != NULL then
return -1
endi
print ====> select c2, c3 , concat_ws('_', c2, c3) from ctb6
sql select c2, c3 , concat_ws('_', c2, c3) from ctb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data02 != 中文测试1_中文测试2 then
return -1
endi
# if $data12 != NULL then
# return -1
# endi
print ====> select c2, c3 , concat_ws('_', c2, c3) from ntb6
sql select c2, c3 , concat_ws('_', c2, c3) from ntb6
print ====> rows: $rows
print ====> $data00 $data01 $data02
print ====> $data10 $data11 $data12
if $rows != 2 then
return -1
endi
if $data02 != 中文测试01_中文测试01 then
return -1
endi
# if $data12 != NULL then
# return -1
# endi
print ====> select c2, substr(c2,1, 4) from ctb6
sql select c2, substr(c2,1, 4) from ctb6
print ====> rows: $rows
print ====> $data00 $data01
print ====> $data10 $data11
if $rows != 2 then
return -1
endi
if $data00 != 中文测试1 then
return -1
endi
if $data01 != 中文测试 then
return -1
endi
# if $data11 != NULL then
# return -1
# endi
print ====> select c2, substr(c2,1, 4) from ntb6
sql select c2, substr(c2,1, 4) from ntb6
print ====> rows: $rows
print ====> $data00 $data01
print ====> $data10 $data11
if $rows != 2 then
return -1
endi
if $data00 != 中文测试01 then
return -1
endi
if $data01 != 中文测试 then
return -1
endi
if $data11 != NULL then
return -1
endi
#sql_error select c1, length(t1), c2, length(t2) from ctb0 #sql_error select c1, length(t1), c2, length(t2) from ctb0
print ====> char_length print ====> char_length
...@@ -493,7 +706,7 @@ if $loop_test == 0 then ...@@ -493,7 +706,7 @@ if $loop_test == 0 then
print =============== stop and restart taosd print =============== stop and restart taosd
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0 $loop_cnt = 0
check_dnode_ready_0: check_dnode_ready_0:
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
...@@ -511,7 +724,7 @@ if $loop_test == 0 then ...@@ -511,7 +724,7 @@ if $loop_test == 0 then
goto check_dnode_ready_0 goto check_dnode_ready_0
endi endi
$loop_test = 1 $loop_test = 1
goto loop_test_pos goto loop_test_pos
endi endi
......
此差异已折叠。
...@@ -35,8 +35,8 @@ ...@@ -35,8 +35,8 @@
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
typedef struct { typedef struct {
int32_t expectMsgCnt; int32_t expectMsgCnt;
int32_t consumeMsgCnt; int32_t consumeMsgCnt;
TdThread thread; TdThread thread;
} SThreadInfo; } SThreadInfo;
...@@ -45,12 +45,12 @@ typedef struct { ...@@ -45,12 +45,12 @@ typedef struct {
char dbName[32]; char dbName[32];
char topicString[256]; char topicString[256];
char keyString[1024]; char keyString[1024];
char topicString1[256]; char topicString1[256];
char keyString1[1024]; char keyString1[1024];
int32_t showMsgFlag; int32_t showMsgFlag;
int32_t consumeDelay; // unit s int32_t consumeDelay; // unit s
int32_t consumeMsgCnt; int32_t consumeMsgCnt;
int32_t checkMode; int32_t checkMode;
// save result after parse agrvs // save result after parse agrvs
int32_t numOfTopic; int32_t numOfTopic;
...@@ -59,13 +59,13 @@ typedef struct { ...@@ -59,13 +59,13 @@ typedef struct {
int32_t numOfKey; int32_t numOfKey;
char key[32][64]; char key[32][64];
char value[32][64]; char value[32][64];
int32_t numOfTopic1; int32_t numOfTopic1;
char topics1[32][64]; char topics1[32][64];
int32_t numOfKey1; int32_t numOfKey1;
char key1[32][64]; char key1[32][64];
char value1[32][64]; char value1[32][64];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
...@@ -186,18 +186,18 @@ void parseInputString() { ...@@ -186,18 +186,18 @@ void parseInputString() {
ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic++; g_stConfInfo.numOfTopic++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.topicString1, delim); token = strtok(g_stConfInfo.topicString1, delim);
while(token != NULL) { while (token != NULL) {
//printf("%s\n", token ); // printf("%s\n", token );
strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token); strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]); ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]);
//printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.numOfTopic1++; g_stConfInfo.numOfTopic1++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
...@@ -214,28 +214,28 @@ void parseInputString() { ...@@ -214,28 +214,28 @@ void parseInputString() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey++; g_stConfInfo.numOfKey++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.keyString1, delim); token = strtok(g_stConfInfo.keyString1, delim);
while(token != NULL) { while (token != NULL) {
//printf("%s\n", token ); // printf("%s\n", token );
{ {
char* pstr = token; char* pstr = token;
ltrim(pstr); ltrim(pstr);
char *ret = strchr(pstr, ch); char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret-pstr); memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr);
strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret+1); strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1);
//printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]); // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
g_stConfInfo.numOfKey1++; // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.numOfKey1++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
} }
static int running = 1; static int running = 1;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
...@@ -253,6 +253,7 @@ int queryDB(TAOS* taos, char* command) { ...@@ -253,6 +253,7 @@ int queryDB(TAOS* taos, char* command) {
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
#if 0
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
...@@ -266,13 +267,19 @@ tmq_t* build_consumer() { ...@@ -266,13 +267,19 @@ tmq_t* build_consumer() {
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
// tmq_conf_set(conf, "group.id", "tg2"); // tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) {
tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]);
} }
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq; return tmq;
} }
...@@ -285,10 +292,10 @@ tmq_list_t* build_topic_list() { ...@@ -285,10 +292,10 @@ tmq_list_t* build_topic_list() {
return topic_list; return topic_list;
} }
tmq_t* build_consumer_x() { tmq_t* build_consumer_x() {
#if 0
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -296,23 +303,29 @@ tmq_t* build_consumer_x() { ...@@ -296,23 +303,29 @@ tmq_t* build_consumer_x() {
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
//tmq_conf_set(conf, "group.id", "tg2"); // tmq_conf_set(conf, "group.id", "tg2");
for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) {
tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]); tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]);
} }
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq; return tmq;
} }
tmq_list_t* build_topic_list_x() { tmq_list_t* build_topic_list_x() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
//tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) {
tmq_list_append(topic_list, g_stConfInfo.topics1[i]); tmq_list_append(topic_list, g_stConfInfo.topics1[i]);
} }
...@@ -367,9 +380,9 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) { ...@@ -367,9 +380,9 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
if (tmqMsg) { if (tmqMsg) {
totalMsgs++; totalMsgs++;
//printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); // printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
#if 0 #if 0
TAOS_ROW row; TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) { while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++; totalRows++;
...@@ -396,65 +409,63 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) { ...@@ -396,65 +409,63 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
exit(-1); exit(-1);
} }
//printf("%d", totalMsgs); // output to sim for check result // printf("%d", totalMsgs); // output to sim for check result
return totalMsgs; return totalMsgs;
} }
void* threadFunc(void* param) {
void *threadFunc(void *param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo *pInfo = (SThreadInfo *)param; SThreadInfo* pInfo = (SThreadInfo*)param;
tmq_t* tmq = build_consumer_x(); tmq_t* tmq = build_consumer_x();
tmq_list_t* topic_list = build_topic_list_x(); tmq_list_t* topic_list = build_topic_list_x();
if ((NULL == tmq) || (NULL == topic_list)){ if ((NULL == tmq) || (NULL == topic_list)) {
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
//if (0 == g_stConfInfo.consumeMsgCnt) { // if (0 == g_stConfInfo.consumeMsgCnt) {
// loop_consume(tmq); // loop_consume(tmq);
//} else { // } else {
pInfo->consumeMsgCnt = parallel_consume(tmq, 1); pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
//} //}
err = tmq_unsubscribe(tmq); err = tmq_unsubscribe(tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
return NULL; return NULL;
} }
int main(int32_t argc, char* argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
parseInputString(); parseInputString();
int32_t numOfThreads = 1; int32_t numOfThreads = 1;
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo));
if (g_stConfInfo.numOfTopic1) { if (g_stConfInfo.numOfTopic1) {
// pthread_create one thread to consume // pthread_create one thread to consume
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].expectMsgCnt = 0; pInfo[i].expectMsgCnt = 0;
pInfo[i].consumeMsgCnt = 0; pInfo[i].consumeMsgCnt = 0;
taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i));
} }
} }
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
if ((NULL == tmq) || (NULL == topic_list)) { if ((NULL == tmq) || (NULL == topic_list)) {
...@@ -479,48 +490,48 @@ int main(int32_t argc, char* argv[]) { ...@@ -479,48 +490,48 @@ int main(int32_t argc, char* argv[]) {
exit(-1); exit(-1);
} }
if (g_stConfInfo.numOfTopic1) { if (g_stConfInfo.numOfTopic1) {
for (int32_t i = 0; i < numOfThreads; i++) { for (int32_t i = 0; i < numOfThreads; i++) {
taosThreadJoin(pInfo[i].thread, NULL); taosThreadJoin(pInfo[i].thread, NULL);
} }
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if (0 == g_stConfInfo.checkMode) { if (0 == g_stConfInfo.checkMode) {
if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) {
printf("success"); printf("success");
} else { } else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
} else if (1 == g_stConfInfo.checkMode) { } else if (1 == g_stConfInfo.checkMode) {
if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) { if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) {
printf("success"); printf("success");
} else { } else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
} else if (2 == g_stConfInfo.checkMode) { } else if (2 == g_stConfInfo.checkMode) {
if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) { if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
printf("success"); printf("success");
} else { } else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
} else if (3 == g_stConfInfo.checkMode) { } else if (3 == g_stConfInfo.checkMode) {
if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) { if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) {
printf("success"); printf("success");
} else { } else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
} else if (4 == g_stConfInfo.checkMode) { } else if (4 == g_stConfInfo.checkMode) {
if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) ||
|| ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ||
|| ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ||
|| ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) { ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
printf("success"); printf("success");
} else { } else {
printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
} else { } else {
printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
} }
} }
return 0; return 0;
......
...@@ -678,7 +678,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { ...@@ -678,7 +678,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
if (tt < 0) tt = 0; if (tt < 0) tt = 0;
#endif #endif
tp = localtime(&tt); tp = taosLocalTime(&tt, NULL);
strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp); strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp);
if (precision == TSDB_TIME_PRECISION_MILLI) { if (precision == TSDB_TIME_PRECISION_MILLI) {
sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000)); sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000));
......
...@@ -452,7 +452,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { ...@@ -452,7 +452,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) {
} }
} }
struct tm *ptm = localtime(&tt); struct tm *ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册