diff --git a/example/src/tmq.c b/example/src/tmq.c index 832e389a13788fae316ef5c063a6a6a64dfd6d84..fdd26bc95db32f04823887e006acca138f36bfaa 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -54,7 +54,8 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)"); + pRes = + taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); if (taos_errno(pRes) != 0) { printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -101,7 +102,7 @@ int32_t create_topic() { /*const char* sql = "select * from tu1";*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c4 from ct1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -144,6 +145,7 @@ void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_ } tmq_t* build_consumer() { +#if 0 TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -152,11 +154,15 @@ tmq_t* build_consumer() { printf("error in use db, reason:%s\n", taos_errstr(pRes)); } taos_free_result(pRes); +#endif tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "td.connect.db", "abc1"); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); return tmq; } diff --git a/include/client/taos.h b/include/client/taos.h index 87948e7824f5ad5cee93d211d702ab3e834704de..218090363305fb7bbcb1cc2fa1627e4afb89bf12 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -247,10 +247,10 @@ DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *); -// will be removed in 3.0 +#if 1 DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); +#endif -// will replace last one DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a985953a08a8346791a90b37eca4702b243884df..ea7905f21a7466d6eacaaa368fdfd46a8b432652 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -332,6 +332,7 @@ int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); void* taosDecodeSEpSet(void* buf, SEpSet* pEp); typedef struct { + int8_t connType; int32_t pid; char app[TSDB_APP_NAME_LEN]; char db[TSDB_DB_NAME_LEN]; @@ -342,12 +343,13 @@ int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); typedef struct { - int32_t acctId; - int64_t clusterId; + int32_t acctId; + int64_t clusterId; uint32_t connId; - int8_t superUser; - SEpSet epSet; - char sVersion[128]; + int8_t superUser; + int8_t connType; + SEpSet epSet; + char sVersion[128]; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); diff --git a/include/common/ttime.h b/include/common/ttime.h index 15450c31ca09998cc7ce22853872eff718917a25..3de0b98d85217f142a903c6e42c257bba5f299b9 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -40,6 +40,7 @@ extern "C" { * @return timestamp decided by global conf variable, tsTimePrecision * if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond. * precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond. + * precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond. */ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { if (precision == TSDB_TIME_PRECISION_MICRO) { @@ -51,6 +52,24 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { } } +/* + * @return timestamp of today at 00:00:00 in given precision + * if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond. + * precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond. + * precision == TSDB_TIME_PRECISION_NANO, it returns timestamp in nanosecond. + */ +static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { + int64_t factor = (precision == TSDB_TIME_PRECISION_MILLI) ? 1000 : + (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000 : 1000000000; + time_t t = taosTime(NULL); + struct tm * tm= taosLocalTime(&t, NULL); + tm->tm_hour = 0; + tm->tm_min = 0; + tm->tm_sec = 0; + + return (int64_t)taosMktime(tm) * factor; +} + int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); diff --git a/include/os/osTime.h b/include/os/osTime.h index 766fec0fbd9c1a74be89e87fb74bf4d17a0e3453..fd431f6df8e65e952ce63d54a31a54c32432dd27 100644 --- a/include/os/osTime.h +++ b/include/os/osTime.h @@ -27,9 +27,11 @@ extern "C" { #ifndef ALLOW_FORBID_FUNC #define strptime STRPTIME_FUNC_TAOS_FORBID #define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID + #define localtime LOCALTIME_FUNC_TAOS_FORBID #define localtime_s LOCALTIMES_FUNC_TAOS_FORBID #define localtime_r LOCALTIMER_FUNC_TAOS_FORBID #define time TIME_FUNC_TAOS_FORBID + #define mktime MKTIME_FUNC_TAOS_FORBID #endif #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) @@ -82,6 +84,8 @@ static FORCE_INLINE int64_t taosGetTimestampNs() { char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm); struct tm *taosLocalTime(const time_t *timep, struct tm *result); +time_t taosTime(time_t *t); +time_t taosMktime(struct tm *timep); #ifdef __cplusplus } diff --git a/packaging/install.sh b/packaging/install.sh index c9748d223b9a36057397e07dbf4c6fd27a90693c..6642f8922170824d08ba5fe5e698f1057f1d9ccb 100755 --- a/packaging/install.sh +++ b/packaging/install.sh @@ -228,6 +228,12 @@ function install_header() { ${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h } +# temp install taosBenchmark +function install_taosTools() { + cd ${script_dir}/taos-tools/ + tar xvf taosTools-1.4.1-Linux-x64.tar.gz && cd taosTools-1.4.1/ && ./install-taostools.sh +} + function add_newHostname_to_hosts() { localIp="127.0.0.1" OLD_IFS="$IFS" @@ -473,6 +479,7 @@ function install_TDengine() { install_log install_header install_lib + install_taosTools if [ -z $1 ]; then # install service and client # For installing new diff --git a/packaging/release.sh b/packaging/release.sh index 5219b1b7b18d1901ead5fb56b6c7a685e7432047..885d73c33b7f47087bf74ce2b22b0ccc03d80541 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -55,6 +55,7 @@ mkdir -p ${install_dir} mkdir -p ${install_dir}/bin mkdir -p ${install_dir}/lib mkdir -p ${install_dir}/inc +mkdir -p ${install_dir}/taos-tools install_files="${script_dir}/install.sh" chmod a+x ${script_dir}/install.sh || : @@ -68,6 +69,8 @@ cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : cp ${compile_dir}/source/client/libtaos.so ${install_dir}/lib/ cp ${compile_dir}/source/libs/tdb/libtdb.so ${install_dir}/lib/ +taostoolfile="${top_dir}/tools/taosTools-1.4.1-Linux-x64.tar.gz" +cp ${taostoolfile} ${install_dir}/taos-tools #cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/ #cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/ diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b089fed8d56b0795a37e3b8a99dc44db36548de2..61a3303c3f4e8c9be788fde01ab2469e7824ecfc 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -45,6 +45,11 @@ extern "C" { #define HEARTBEAT_INTERVAL 1500 // ms +enum { + CONN_TYPE__QUERY = 1, + CONN_TYPE__TMQ, +}; + typedef struct SAppInstInfo SAppInstInfo; typedef struct { @@ -132,9 +137,9 @@ typedef struct STscObj { char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_FNAME_LEN]; char ver[128]; + int8_t connType; int32_t acctId; uint32_t connId; - int32_t connType; uint64_t id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection @@ -277,14 +282,14 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void initMsgHandleFp(); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, - uint16_t port); + uint16_t port, int connType); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); -void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); +void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 4f716ba4318dd2e541f9504916bc4fa37b4fc7ed..4bd8a89a38d35dbc63e2cd6175579be1c9ead6fb 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -24,6 +24,8 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); +static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } + static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { @@ -417,11 +419,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } - void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; } @@ -564,7 +565,7 @@ static int32_t hbCreateThread() { if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; - } + } taosThreadAttrDestroy(&thAttr); return 0; } @@ -694,7 +695,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType) { SClientHbKey connKey = { .tscRid = tscRefId, - .hbType = HEARTBEAT_TYPE_QUERY, + .hbType = hbType, }; SHbConnInfo info = {0}; @@ -704,16 +705,14 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in *pClusterId = clusterId; info.param = pClusterId; - break; + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); } case HEARTBEAT_TYPE_MQ: { - break; + return 0; } default: - break; + return 0; } - - return hbRegisterConnImpl(pAppHbMgr, connKey, &info); } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 04c81a8afbe876288a4226489eb1133565fcc50d..953da056eace2621c816ece0046ba21708bc3196 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -11,7 +11,7 @@ #include "tref.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); -static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static bool stringLengthCheck(const char* str, size_t maxsize) { @@ -40,10 +40,10 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i } static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, - SAppInstInfo* pAppInfo); + SAppInstInfo* pAppInfo, int connType); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, - uint16_t port) { + uint16_t port, int connType) { if (taos_init() != TSDB_CODE_SUCCESS) { return NULL; } @@ -111,7 +111,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, taosThreadMutexUnlock(&appInfo.mutex); taosMemoryFreeClear(key); - return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst); + return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType); } int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { @@ -425,7 +425,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe } STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, - SAppInstInfo* pAppInfo) { + SAppInstInfo* pAppInfo, int connType) { STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -439,7 +439,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return NULL; } - SMsgSendInfo* body = buildConnectMsg(pRequest); + SMsgSendInfo* body = buildConnectMsg(pRequest, connType); int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); @@ -462,7 +462,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return pTscObj; } -static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -485,6 +485,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { } taosMemoryFreeClear(db); + connectReq.connType = connType; connectReq.pid = htonl(appInfo.pid); connectReq.startTime = htobe64(appInfo.startTime); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); @@ -570,7 +571,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons return NULL; } - return taos_connect_internal(ip, user, NULL, auth, db, port); + return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); } TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, @@ -613,7 +614,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) { } } -void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { +void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c7c497010c65d25d7683883a4c575b27ceea54b9..a5fd0b49ecdd042b533161686f92b3908343b932 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -90,7 +90,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha pass = TSDB_DEFAULT_PASS; } - return taos_connect_internal(ip, user, pass, NULL, db, port); + return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); } void taos_close(TAOS *taos) { @@ -127,8 +127,10 @@ const char *taos_errstr(TAOS_RES *res) { } void taos_free_result(TAOS_RES *res) { - SRequestObj *pRequest = (SRequestObj *)res; - destroyRequest(pRequest); + if (TD_RES_QUERY(res)) { + SRequestObj *pRequest = (SRequestObj *)res; + destroyRequest(pRequest); + } } int taos_field_count(TAOS_RES *res) { @@ -171,7 +173,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - return doFetchRow(pRequest, true, true); + return doFetchRows(pRequest, true, true); } else if (TD_RES_TMQ(res)) { SMqRspObj *msg = ((SMqRspObj *)res); @@ -437,7 +439,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { return 0; } - doFetchRow(pRequest, false, true); + doFetchRows(pRequest, false, true); // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; @@ -472,7 +474,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } - doFetchRow(pRequest, false, true); + doFetchRows(pRequest, false, false); SReqResultInfo *pResultInfo = &pRequest->body.resInfo; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index a5a3f995d164a29387b9b05a4c053407d432329b..f822da7c8dad9a00e6236daf85e66705efce9c44 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -69,9 +69,9 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->clusterId = connectRsp.clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - pTscObj->connType = HEARTBEAT_TYPE_QUERY; + pTscObj->connType = connectRsp.connType; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, @@ -119,13 +119,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (usedbRsp.vgVersion >= 0) { int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, + tstrerror(code)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } } - tFreeSUsedbRsp(&usedbRsp); + tFreeSUsedbRsp(&usedbRsp); } if (code != TSDB_CODE_SUCCESS) { @@ -139,7 +140,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); SName name = {0}; - tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB); + tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); SUseDbOutput output = {0}; code = queryBuildUseDbOutput(&output, &usedbRsp); @@ -151,11 +152,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { tscError("failed to build use db output since %s", terrstr()); } else { - struct SCatalog *pCatalog = NULL; - + struct SCatalog* pCatalog = NULL; + int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, + tstrerror(code)); } else { catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 2b69b478658a777860034fff49f4df12d289db49..478e328a165e04903544590bdb48b4303b5cef74 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -357,7 +357,15 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (pTmq == NULL) { return NULL; } - pTmq->pTscObj = taos_connect(conf->ip, conf->user, conf->pass, conf->db, conf->port); + const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user; + const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; + + ASSERT(user); + ASSERT(pass); + ASSERT(conf->db); + + pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); + if (pTmq->pTscObj == NULL) return NULL; pTmq->inWaiting = 0; pTmq->status = 0; @@ -783,7 +791,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } } - struct tm* ptm = localtime(&tt); + struct tm* ptm = taosLocalTime(&tt, NULL); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); if (precision == TSDB_TIME_PRECISION_NANO) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 053a1a1ee6a802da00350cc08b429191d6ffd894..dad5805c6698b1c43920e9e4f854ab01d8217e29 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1368,7 +1368,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { } } - struct tm* ptm = localtime(&tt); + struct tm* ptm = taosLocalTime(&tt, NULL); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); if (precision == TSDB_TIME_PRECISION_NANO) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7ea2329f320653c4216dc387b4ca1a7926edec7e..a9a28b4879dc0d403bdd30e402bece7f809d8b40 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2643,6 +2643,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI8(&encoder, pReq->connType) < 0) return -1; if (tEncodeI32(&encoder, pReq->pid) < 0) return -1; if (tEncodeCStr(&encoder, pReq->app) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; @@ -2659,6 +2660,7 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->connType) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; @@ -2678,6 +2680,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; if (tEncodeU32(&encoder, pRsp->connId) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; tEndEncode(&encoder); @@ -2696,6 +2699,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; if (tDecodeU32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; tEndDecode(&decoder); diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 1baf393e9a215c9e18cd2208c28a6163a501564b..4686d856cc36ebb1ecf71b7eda17b16c388fea87 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -70,7 +70,7 @@ void deltaToUtcInitOnce() { struct tm tm = {0}; (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); - m_deltaUtc = (int64_t)mktime(&tm); + m_deltaUtc = (int64_t)taosMktime(&tm); // printf("====delta:%lld\n\n", seconds); } @@ -344,7 +344,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { } /* mktime will be affected by TZ, set by using taos_options */ - int64_t seconds = mktime(&tm); + int64_t seconds = taosMktime(&tm); int64_t fraction = 0; @@ -539,7 +539,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - return (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); + return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); } int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { @@ -598,7 +598,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio tm.tm_mon = mon % 12; } - start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); + start = (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); } else { int64_t delta = t - pInterval->interval; int32_t factor = (delta >= 0) ? 1 : -1; @@ -745,7 +745,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) assert(false); } - ptm = localtime("); + ptm = taosLocalTime(", NULL); int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); length += snprintf(ts + length, fractionLen, format, mod); length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index c97d4dd8074a855129cf945b613743437e5cc1a3..15aa55ef83beba6d0dcf62e25a430ddb8ed4c9cb 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -14,12 +14,13 @@ */ #define _DEFAULT_SOURCE -#ifndef _GRANT #include "os.h" #include "taoserror.h" #include "mndGrant.h" #include "mndInt.h" +#ifndef _GRANT + int32_t mndInitGrant(SMnode *pMnode) { return TSDB_CODE_SUCCESS; } void mndCleanupGrant() {} void grantParseParameter() { mError("can't parsed parameter k"); } @@ -30,4 +31,4 @@ void grantRestore(EGrantType grant, uint64_t value) {} #endif -void parseGrantParameter() { parseGrantParameter(); } \ No newline at end of file +void parseGrantParameter() { grantParseParameter(); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 2055512e8271b9b267c1e2fb916f533eba788005..96698e80bba0989ba9ddf8ac4a369ca2b039cd5a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -23,13 +23,14 @@ #include "tglobal.h" #include "version.h" -#define QUERY_ID_SIZE 20 -#define QUERY_OBJ_ID_SIZE 18 +#define QUERY_ID_SIZE 20 +#define QUERY_OBJ_ID_SIZE 18 #define SUBQUERY_INFO_SIZE 6 -#define QUERY_SAVE_SIZE 20 +#define QUERY_SAVE_SIZE 20 typedef struct { uint32_t id; + int8_t connType; char user[TSDB_USER_LEN]; char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc int64_t appStartTimeMs; // app start time @@ -44,8 +45,8 @@ typedef struct { SArray *pQueries; //SArray } SConnObj; -static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, - const char *app, int64_t startTime); +static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, + int32_t pid, const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); @@ -93,8 +94,8 @@ void mndCleanupProfile(SMnode *pMnode) { } } -static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, - const char *app, int64_t startTime) { +static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, + int32_t pid, const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; char connStr[255] = {0}; @@ -103,6 +104,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui if (startTime == 0) startTime = taosGetTimestampMs(); SConnObj connObj = {.id = connId, + .connType = connType, .appStartTimeMs = startTime, .pid = pid, .ip = ip, @@ -160,8 +162,8 @@ static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { } void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) { - SConnObj* pConn = NULL; - bool hasNext = taosCacheIterNext(pIter); + SConnObj *pConn = NULL; + bool hasNext = taosCacheIterNext(pIter); if (hasNext) { size_t dataLen = 0; pConn = taosCacheIterGetData(pIter, &dataLen); @@ -211,8 +213,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { } } - pConn = - mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime); + pConn = mndCreateConn(pMnode, pReq->user, connReq.connType, pReq->clientIp, pReq->clientPort, connReq.pid, + connReq.app, connReq.startTime); if (pConn == NULL) { mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); goto CONN_OVER; @@ -225,6 +227,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { connectRsp.superUser = pUser->superUser; connectRsp.clusterId = pMnode->clusterId; connectRsp.connId = pConn->id; + connectRsp.connType = connReq.connType; snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); @@ -442,7 +445,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { return -1; } - SClientHbBatchRsp batchRsp = {0}; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index e2b1725d9c6cc74e9da134b3916ec04f1bf524d9..054bff466cf9d7365b0f1907a98340b319aa1355 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -652,24 +652,26 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB if (pShow->pIter == NULL) break; cols = 0; - - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols); char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]); colDataAppend(pColInfo, numOfRows, (const char*) name, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols); const char* src = pUser->superUser? "super":"normal"; char b[10+VARSTR_HEADER_SIZE] = {0}; STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src)); colDataAppend(pColInfo, numOfRows, (const char*) b, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols); colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false); + cols++; pColInfo = taosArrayGet(pBlock->pDataBlock, cols); STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]); colDataAppend(pColInfo, numOfRows, (const char*) name, false); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 68e70da9764983fce1e6b1db51146813bed518c1..afcffa3d286d3bcefbcac96b24a63911db567eea 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -173,12 +173,12 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor); tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); + tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); mon = (int)(mon + interval); tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); + tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); tw->ekey -= 1; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 89b70f0435881794721f8e19e705dac05287bcef..acc597d61bc4ea9a430f86cc90e637ae158cc13d 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -325,6 +325,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time if (pToken->type == TK_NOW) { ts = taosGetTimestamp(timePrec); + } else if (pToken->type == TK_TODAY) { + ts = taosGetTimestampToday(timePrec); } else if (pToken->type == TK_NK_INTEGER) { bool isSigned = false; toInteger(pToken->z, pToken->n, 10, &ts, &isSigned); @@ -389,8 +391,8 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time } static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { - if ((pToken->type != TK_NOW && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL && - pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) || + if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && + pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) || (pToken->n == 0) || (pToken->type == TK_NK_RP)) { return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index db62a6b33d6baaf60b4cc0c0fa0eda8bf641ef53..4d57af822ea9cc938d36b208febce7b4a3cd1903 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -841,7 +841,7 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); } - struct tm *tmInfo = localtime((const time_t *)&timeVal); + struct tm *tmInfo = taosLocalTime((const time_t *)&timeVal, NULL); strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S%z", tmInfo); int32_t len = (int32_t)strlen(buf); diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 2b0de948805c7a02ea98a849b8f75b7ff372d6e4..9ea49b364e902f48febe0f85f4cf90e8425fe6ad 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -406,7 +406,18 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { #endif } +time_t taosTime(time_t *t) { + return time(t); +} + +time_t taosMktime(struct tm *timep) { + return mktime(timep); +} + struct tm *taosLocalTime(const time_t *timep, struct tm *result) { + if (result == NULL) { + return localtime(timep); + } #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) localtime_s(result, timep); #else diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 35476980b1c359c8e4c5edee28acc7a28737a164..082f3adcec584653d9cca316081c0ff22ba95ccb 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -56,8 +56,8 @@ # ---- tmq ./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic1.sim -./test.sh -f tsim/tmq/oneTopic.sim -./test.sh -f tsim/tmq/multiTopic.sim +#./test.sh -f tsim/tmq/oneTopic.sim +#./test.sh -f tsim/tmq/multiTopic.sim #./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim #./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim diff --git a/tests/script/tsim/query/charScalarFunction.sim b/tests/script/tsim/query/charScalarFunction.sim index 13108fecb7a651972f02ecd566721c67b4c8a0a3..afdb9e0d74f5c53282bdd612c98984f71fecdfa0 100644 --- a/tests/script/tsim/query/charScalarFunction.sim +++ b/tests/script/tsim/query/charScalarFunction.sim @@ -83,6 +83,14 @@ sql insert into ntb5 values ("2022-01-01 00:00:00.000" , "0123456789" , "0123456 sql insert into ctb5 values ("2022-01-01 00:00:00.001" , NULL , NULL ) sql insert into ntb5 values ("2022-01-01 00:00:00.001" , NULL , NULL ) +sql create table stb3 (ts timestamp, c1 binary(64), c2 nchar(64), c3 nchar(64) ) tags (t1 nchar(64)) +sql create table ctb6 using stb3 tags("tag-nchar-6") +sql create table ntb6 (ts timestamp, c1 binary(64), c2 nchar(64), c3 nchar(64) ) +sql insert into ctb6 values ("2022-01-01 00:00:00.000" , "0123456789" , "中文测试1" , "中文测试2" ) +sql insert into ntb6 values ("2022-01-01 00:00:00.000" , "0123456789" , "中文测试01", "中文测试01" ) +sql insert into ctb6 values ("2022-01-01 00:00:00.001" , NULL , NULL, NULL ) +sql insert into ntb6 values ("2022-01-01 00:00:00.001" , NULL , NULL, NULL ) + $loop_test = 0 loop_test_pos: @@ -150,6 +158,210 @@ if $data01 != 12 then return -1 endi +print ====> select c2 ,length(c2), char_length(c2) from ctb6 +sql select c2 ,length(c2), char_length(c2) from ctb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data01 != 20 then + return -1 +endi +if $data02 != 5 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +print ====> select c2 ,length(c2),char_length(c2) from ntb6 +sql select c2 ,length(c2),char_length(c2) from ntb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data01 != 24 then + return -1 +endi +if $data02 != 6 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +print ====> select c2 ,lower(c2), upper(c2) from ctb6 +sql select c2 ,lower(c2), upper(c2) from ctb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data01 != 中文测试1 then + return -1 +endi +if $data02 != 中文测试1 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +print ====> select c2 ,lower(c2), upper(c2) from ntb6 +sql select c2 ,lower(c2), upper(c2) from ntb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data01 != 中文测试01 then + return -1 +endi +if $data02 != 中文测试01 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +print ====> select c2, ltrim(c2), ltrim(c2) from ctb6 +sql select c2, ltrim(c2), ltrim(c2) from ctb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data01 != 中文测试1 then + return -1 +endi +if $data02 != 中文测试1 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +print ====> select c2, ltrim(c2), ltrim(c2) from ntb6 +sql select c2, ltrim(c2), ltrim(c2) from ntb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data01 != 中文测试01 then + return -1 +endi +if $data02 != 中文测试01 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + +# print ====> select c2, c3 , concat(c2,c3) from ctb6 +# sql select c2, c3 , concat(c2,c3) from ctb6 +# print ====> rows: $rows +# print ====> $data00 $data01 $data02 +# print ====> $data10 $data11 $data12 +# if $rows != 2 then +# return -1 +# endi +# if $data02 != 中文测试01中文测试01 then +# return -1 +# endi +# if $data12 != NULL then +# return -1 +# endi + +print ====> select c2, c3 , concat(c2,c3) from ntb6 +sql select c2, c3 , concat(c2,c3) from ntb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data02 != 中文测试01中文测试01 then + return -1 +endi +if $data12 != NULL then + return -1 +endi + +print ====> select c2, c3 , concat_ws('_', c2, c3) from ctb6 +sql select c2, c3 , concat_ws('_', c2, c3) from ctb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data02 != 中文测试1_中文测试2 then + return -1 +endi +# if $data12 != NULL then +# return -1 +# endi + +print ====> select c2, c3 , concat_ws('_', c2, c3) from ntb6 +sql select c2, c3 , concat_ws('_', c2, c3) from ntb6 +print ====> rows: $rows +print ====> $data00 $data01 $data02 +print ====> $data10 $data11 $data12 +if $rows != 2 then + return -1 +endi +if $data02 != 中文测试01_中文测试01 then + return -1 +endi +# if $data12 != NULL then +# return -1 +# endi + +print ====> select c2, substr(c2,1, 4) from ctb6 +sql select c2, substr(c2,1, 4) from ctb6 +print ====> rows: $rows +print ====> $data00 $data01 +print ====> $data10 $data11 +if $rows != 2 then + return -1 +endi +if $data00 != 中文测试1 then + return -1 +endi +if $data01 != 中文测试 then + return -1 +endi +# if $data11 != NULL then +# return -1 +# endi + +print ====> select c2, substr(c2,1, 4) from ntb6 +sql select c2, substr(c2,1, 4) from ntb6 +print ====> rows: $rows +print ====> $data00 $data01 +print ====> $data10 $data11 +if $rows != 2 then + return -1 +endi +if $data00 != 中文测试01 then + return -1 +endi +if $data01 != 中文测试 then + return -1 +endi +if $data11 != NULL then + return -1 +endi + #sql_error select c1, length(t1), c2, length(t2) from ctb0 print ====> char_length @@ -493,7 +705,7 @@ if $loop_test == 0 then print =============== stop and restart taosd system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start - + $loop_cnt = 0 check_dnode_ready_0: $loop_cnt = $loop_cnt + 1 @@ -511,7 +723,7 @@ if $loop_test == 0 then goto check_dnode_ready_0 endi - $loop_test = 1 + $loop_test = 1 goto loop_test_pos endi diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 756893f217474731c3e87e700309aaec1355f4c0..16f6c42d872b9c038954135e3258ccb899c6accd 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -13,76 +13,71 @@ * along with this program. If not, see . */ -// clang-format off +#define ALLOW_FORBID_FUNC #include +#include #include +#include #include -#include #include #include +#include #include -#include -#include #include "taos.h" #include "taoserror.h" #include "tlog.h" -#define GREEN "\033[1;32m" -#define NC "\033[0m" +#define GREEN "\033[1;32m" +#define NC "\033[0m" #define min(a, b) (((a) < (b)) ? (a) : (b)) -#define MAX_SQL_STR_LEN (1024 * 1024) -#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) -enum _RUN_MODE { - TMQ_RUN_INSERT_AND_CONSUME, - TMQ_RUN_ONLY_INSERT, - TMQ_RUN_ONLY_CONSUME, - TMQ_RUN_MODE_BUTT -}; +enum _RUN_MODE { TMQ_RUN_INSERT_AND_CONSUME, TMQ_RUN_ONLY_INSERT, TMQ_RUN_ONLY_CONSUME, TMQ_RUN_MODE_BUTT }; typedef struct { - char dbName[32]; - char stbName[64]; - char resultFileName[256]; - char vnodeWalPath[256]; - int32_t numOfThreads; - int32_t numOfTables; - int32_t numOfVgroups; - int32_t runMode; - int32_t numOfColumn; - double ratio; - int32_t batchNumOfRow; - int32_t totalRowsOfPerTbl; - int64_t startTimestamp; - int32_t showMsgFlag; - int32_t simCase; - - int32_t totalRowsOfT2; + char dbName[32]; + char stbName[64]; + char resultFileName[256]; + char vnodeWalPath[256]; + int32_t numOfThreads; + int32_t numOfTables; + int32_t numOfVgroups; + int32_t runMode; + int32_t numOfColumn; + double ratio; + int32_t batchNumOfRow; + int32_t totalRowsOfPerTbl; + int64_t startTimestamp; + int32_t showMsgFlag; + int32_t simCase; + + int32_t totalRowsOfT2; } SConfInfo; static SConfInfo g_stConfInfo = { "tmqdb", "stb", - "./tmqResult.txt", // output_file + "./tmqResult.txt", // output_file "", // /data2/dnode/data/vnode/vnode2/wal", - 1, // threads - 1, // tables - 1, // vgroups - 0, // run mode - 1, // columns - 1, // ratio - 1, // batch size - 10000, // total rows for per table - 0, // 2020-01-01 00:00:00.000 - 0, // show consume msg switch - 0, // if run in sim case + 1, // threads + 1, // tables + 1, // vgroups + 0, // run mode + 1, // columns + 1, // ratio + 1, // batch size + 10000, // total rows for per table + 0, // 2020-01-01 00:00:00.000 + 0, // show consume msg switch + 0, // if run in sim case 10000, }; -char* g_pRowValue = NULL; +char* g_pRowValue = NULL; TdFilePtr g_fp = NULL; static void printHelp() { @@ -125,10 +120,8 @@ static void printHelp() { exit(EXIT_SUCCESS); } -void parseArgument(int32_t argc, char *argv[]) { - +void parseArgument(int32_t argc, char* argv[]) { g_stConfInfo.startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000 - for (int32_t i = 1; i < argc; i++) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { @@ -156,7 +149,7 @@ void parseArgument(int32_t argc, char *argv[]) { g_stConfInfo.batchNumOfRow = atoi(argv[++i]); } else if (strcmp(argv[i], "-r") == 0) { g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]); - } else if (strcmp(argv[i], "-l") == 0) { + } else if (strcmp(argv[i], "-l") == 0) { g_stConfInfo.numOfColumn = atoi(argv[++i]); } else if (strcmp(argv[i], "-q") == 0) { g_stConfInfo.ratio = atof(argv[++i]); @@ -168,7 +161,7 @@ void parseArgument(int32_t argc, char *argv[]) { g_stConfInfo.simCase = atol(argv[++i]); } else { printf("%s unknow para: %s %s", GREEN, argv[++i], NC); - exit(-1); + exit(-1); } } @@ -191,73 +184,71 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC); pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); -#endif +#endif } -static int running = 1; +static int running = 1; /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ // calc dir size (not include itself 4096Byte) -int64_t getDirectorySize(char *dir) -{ - TdDirPtr pDir; - TdDirEntryPtr pDirEntry; - int64_t totalSize=0; - - if ((pDir = taosOpenDir(dir)) == NULL) { - fprintf(stderr, "Cannot open dir: %s\n", dir); - return -1; - } +int64_t getDirectorySize(char* dir) { + TdDirPtr pDir; + TdDirEntryPtr pDirEntry; + int64_t totalSize = 0; - //lstat(dir, &statbuf); - //totalSize+=statbuf.st_size; - - while ((pDirEntry = taosReadDir(pDir)) != NULL) { - char subdir[1024]; - char* fileName = taosGetDirEntryName(pDirEntry); - sprintf(subdir, "%s/%s", dir, fileName); - - //printf("===d_name: %s\n", entry->d_name); - if (taosIsDir(subdir)) { - if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) { - continue; - } - - int64_t subDirSize = getDirectorySize(subdir); - totalSize+=subDirSize; - } else if (0 == strcmp(strchr(fileName, '.'), ".log")) { // only calc .log file size, and not include .idx file - int64_t file_size = 0; - taosStatFile(subdir, &file_size, NULL); - totalSize+=file_size; - } + if ((pDir = taosOpenDir(dir)) == NULL) { + fprintf(stderr, "Cannot open dir: %s\n", dir); + return -1; + } + + // lstat(dir, &statbuf); + // totalSize+=statbuf.st_size; + + while ((pDirEntry = taosReadDir(pDir)) != NULL) { + char subdir[1024]; + char* fileName = taosGetDirEntryName(pDirEntry); + sprintf(subdir, "%s/%s", dir, fileName); + + // printf("===d_name: %s\n", entry->d_name); + if (taosIsDir(subdir)) { + if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) { + continue; + } + + int64_t subDirSize = getDirectorySize(subdir); + totalSize += subDirSize; + } else if (0 == strcmp(strchr(fileName, '.'), ".log")) { // only calc .log file size, and not include .idx file + int64_t file_size = 0; + taosStatFile(subdir, &file_size, NULL); + totalSize += file_size; } + } - taosCloseDir(pDir); - return totalSize; + taosCloseDir(pDir); + return totalSize; } - -int queryDB(TAOS *taos, char *command) { - TAOS_RES *pRes = taos_query(taos, command); - int code = taos_errno(pRes); - //if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { - if (code != 0) { - pError("failed to reason:%s, sql: %s", tstrerror(code), command); - taos_free_result(pRes); - return -1; - } - taos_free_result(pRes); - return 0 ; +int queryDB(TAOS* taos, char* command) { + TAOS_RES* pRes = taos_query(taos, command); + int code = taos_errno(pRes); + // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + if (code != 0) { + pError("failed to reason:%s, sql: %s", tstrerror(code), command); + taos_free_result(pRes); + return -1; + } + taos_free_result(pRes); + return 0; } int32_t init_env() { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); if (pConn == NULL) { return -1; } - + sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { @@ -282,19 +273,19 @@ int32_t init_env() { int32_t dataLen = 0; int32_t sqlLen = 0; - sqlLen += sprintf(sqlStr+sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName); + sqlLen += sprintf(sqlStr + sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName); for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) { - if (i == g_stConfInfo.numOfColumn - 1) { - sqlLen += sprintf(sqlStr+sqlLen, "c%d int) ", i); - memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899")); - dataLen += strlen("66778899"); - } else { - sqlLen += sprintf(sqlStr+sqlLen, "c%d int, ", i); - memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, ")); - dataLen += strlen("66778899, "); - } - } - sqlLen += sprintf(sqlStr+sqlLen, "tags (t0 int)"); + if (i == g_stConfInfo.numOfColumn - 1) { + sqlLen += sprintf(sqlStr + sqlLen, "c%d int) ", i); + memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899")); + dataLen += strlen("66778899"); + } else { + sqlLen += sprintf(sqlStr + sqlLen, "c%d int, ", i); + memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, ")); + dataLen += strlen("66778899, "); + } + } + sqlLen += sprintf(sqlStr + sqlLen, "tags (t0 int)"); pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { @@ -313,7 +304,7 @@ int32_t init_env() { taos_free_result(pRes); } - //const char* sql = "select * from tu1"; + // const char* sql = "select * from tu1"; sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName); /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/ pRes = taos_query(pConn, sqlStr); @@ -327,6 +318,7 @@ int32_t init_env() { } tmq_t* build_consumer() { +#if 0 char sqlStr[1024] = {0}; TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -338,10 +330,15 @@ tmq_t* build_consumer() { printf("error in use db, reason:%s\n", taos_errstr(pRes)); } taos_free_result(pRes); +#endif tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg2"); - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); + tmq_conf_destroy(conf); return tmq; } @@ -396,20 +393,20 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog if (tmqmessage) { batchCnt++; /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/ - if (0 != g_stConfInfo.showMsgFlag) { + if (0 != g_stConfInfo.showMsgFlag) { /*msg_process(tmqmessage);*/ - } + } tmq_message_destroy(tmqmessage); } else { break; } } int64_t endTime = taosGetTimestampUs(); - double consumeTime = (double)(endTime - startTime) / 1000000; + double consumeTime = (double)(endTime - startTime) / 1000000; if (batchCnt != totalMsgs) { - printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); - /*exit(-1);*/ + printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); + /*exit(-1);*/ } if (0 == g_stConfInfo.simCase) { @@ -417,12 +414,14 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog } else { printf("{consume success: %d}", totalMsgs); } - taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, (double)walLogSize / 1024.0 / batchCnt); + taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, + (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, + (double)walLogSize / 1024.0 / batchCnt); err = tmq_consumer_close(tmq); if (err) { fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); - exit(-1); + exit(-1); } } @@ -430,7 +429,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog int32_t syncWriteData() { TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); if (pConn == NULL) { - return -1; + return -1; } char sqlStr[1024] = {0}; @@ -449,11 +448,11 @@ int32_t syncWriteData() { } int32_t totalMsgs = 0; - + int64_t time_counter = g_stConfInfo.startTimestamp; for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) { for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) { - int inserted = i; + int inserted = i; int64_t tmp_time = time_counter; int32_t data_len = 0; @@ -465,22 +464,22 @@ int32_t syncWriteData() { k++; if (inserted >= g_stConfInfo.totalRowsOfPerTbl) { - break; + break; } - if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) { + if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) { break; - } + } } int code = queryDB(pConn, buffer); - if (0 != code){ + if (0 != code) { fprintf(stderr, "insert data error!\n"); - taosMemoryFreeClear(buffer); - return -1; - } + taosMemoryFreeClear(buffer); + return -1; + } - totalMsgs++; + totalMsgs++; if (tID == g_stConfInfo.numOfTables - 1) { i = inserted; @@ -492,12 +491,11 @@ int32_t syncWriteData() { return totalMsgs; } - // sync insertion int32_t syncWriteDataByRatio() { TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); if (pConn == NULL) { - return -1; + return -1; } char sqlStr[1024] = {0}; @@ -518,27 +516,27 @@ int32_t syncWriteDataByRatio() { int32_t totalMsgs = 0; int32_t insertedOfT1 = 0; - int32_t insertedOfT2 = 0; + int32_t insertedOfT2 = 0; int64_t tsOfT1 = g_stConfInfo.startTimestamp; int64_t tsOfT2 = g_stConfInfo.startTimestamp; int64_t tmp_time; - + for (;;) { - if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) { + if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) { break; - } - + } + for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) { if (0 == tID) { - tmp_time = tsOfT1; + tmp_time = tsOfT1; if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) { - continue; + continue; } - } else if (1 == tID){ - tmp_time = tsOfT2; + } else if (1 == tID) { + tmp_time = tsOfT2; if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) { - continue; + continue; } } @@ -548,79 +546,86 @@ int32_t syncWriteDataByRatio() { for (k = 0; k < g_stConfInfo.batchNumOfRow;) { data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue); k++; - if (0 == tID) { + if (0 == tID) { insertedOfT1++; - if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) { - break; - } - } else if (1 == tID){ + if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) { + break; + } + } else if (1 == tID) { insertedOfT2++; - if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) { - break; - } - } + if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) { + break; + } + } - if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) { + if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) { break; - } + } } int code = queryDB(pConn, buffer); - if (0 != code){ + if (0 != code) { fprintf(stderr, "insert data error!\n"); - taosMemoryFreeClear(buffer); - return -1; - } - + taosMemoryFreeClear(buffer); + return -1; + } + if (0 == tID) { - tsOfT1 = tmp_time; - } else if (1 == tID){ - tsOfT2 = tmp_time; + tsOfT1 = tmp_time; + } else if (1 == tID) { + tsOfT2 = tmp_time; } - totalMsgs++; + totalMsgs++; } } - pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl, g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2); + pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl, + g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2); taosMemoryFreeClear(buffer); return totalMsgs; } void printParaIntoFile() { // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); - TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); + TdFilePtr pFile = + taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName); - exit -1; + exit - 1; }; g_fp = pFile; - time_t tTime = taosGetTimestampSec(); + time_t tTime = taosGetTimestampSec(); struct tm tm = *localtime(&tTime); taosFprintfFile(pFile, "###################################################################\n"); - taosFprintfFile(pFile, "# configDir: %s\n", configDir); - taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); - taosFprintfFile(pFile, "# stbName: %s\n", g_stConfInfo.stbName); - taosFprintfFile(pFile, "# vnodeWalPath: %s\n", g_stConfInfo.vnodeWalPath); - taosFprintfFile(pFile, "# numOfTables: %d\n", g_stConfInfo.numOfTables); - taosFprintfFile(pFile, "# numOfThreads: %d\n", g_stConfInfo.numOfThreads); - taosFprintfFile(pFile, "# numOfVgroups: %d\n", g_stConfInfo.numOfVgroups); - taosFprintfFile(pFile, "# runMode: %d\n", g_stConfInfo.runMode); - taosFprintfFile(pFile, "# ratio: %f\n", g_stConfInfo.ratio); - taosFprintfFile(pFile, "# numOfColumn: %d\n", g_stConfInfo.numOfColumn); - taosFprintfFile(pFile, "# batchNumOfRow: %d\n", g_stConfInfo.batchNumOfRow); - taosFprintfFile(pFile, "# totalRowsOfPerTbl: %d\n", g_stConfInfo.totalRowsOfPerTbl); - taosFprintfFile(pFile, "# totalRowsOfT2: %d\n", g_stConfInfo.totalRowsOfT2); + taosFprintfFile(pFile, "# configDir: %s\n", configDir); + taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); + taosFprintfFile(pFile, "# stbName: %s\n", g_stConfInfo.stbName); + taosFprintfFile(pFile, "# vnodeWalPath: %s\n", g_stConfInfo.vnodeWalPath); + taosFprintfFile(pFile, "# numOfTables: %d\n", g_stConfInfo.numOfTables); + taosFprintfFile(pFile, "# numOfThreads: %d\n", g_stConfInfo.numOfThreads); + taosFprintfFile(pFile, "# numOfVgroups: %d\n", g_stConfInfo.numOfVgroups); + taosFprintfFile(pFile, "# runMode: %d\n", g_stConfInfo.runMode); + taosFprintfFile(pFile, "# ratio: %f\n", g_stConfInfo.ratio); + taosFprintfFile(pFile, "# numOfColumn: %d\n", g_stConfInfo.numOfColumn); + taosFprintfFile(pFile, "# batchNumOfRow: %d\n", g_stConfInfo.batchNumOfRow); + taosFprintfFile(pFile, "# totalRowsOfPerTbl: %d\n", g_stConfInfo.totalRowsOfPerTbl); + taosFprintfFile(pFile, "# totalRowsOfT2: %d\n", g_stConfInfo.totalRowsOfT2); taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); taosFprintfFile(pFile, "###################################################################\n"); - taosFprintfFile(pFile, "|-------------------------------insert info-----------------------------|--------------------------------consume info---------------------------------|\n"); - taosFprintfFile(pFile, "|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume time(s) | msgs/s | MB/s | avg msg size(KB) |\n"); + taosFprintfFile(pFile, + "|-------------------------------insert " + "info-----------------------------|--------------------------------consume " + "info---------------------------------|\n"); + taosFprintfFile(pFile, + "|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume " + "time(s) | msgs/s | MB/s | avg msg size(KB) |\n"); taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow); } -int main(int32_t argc, char *argv[]) { +int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); printParaIntoFile(); @@ -630,70 +635,70 @@ int main(int32_t argc, char *argv[]) { code = init_env(); if (code != 0) { fprintf(stderr, "%% init_env error!\n"); - return -1; + return -1; } int32_t totalMsgs = 0; if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) { - int64_t startTs = taosGetTimestampUs(); if (1 == g_stConfInfo.ratio) { - totalMsgs = syncWriteData(); + totalMsgs = syncWriteData(); } else { - totalMsgs = syncWriteDataByRatio(); - } - + totalMsgs = syncWriteDataByRatio(); + } + if (totalMsgs <= 0) { - pError("inset data error!\n"); + pError("inset data error!\n"); return -1; - } - int64_t endTs = taosGetTimestampUs(); - int64_t delay = endTs - startTs; - - int32_t totalRows = 0; - if (1 == g_stConfInfo.ratio) { - totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables; - } else { - totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio); - } - - float seconds = delay / 1000000.0; - float rowsSpeed = totalRows / seconds; - float msgsSpeed = totalMsgs / seconds; - - - if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) { - walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); - if (walLogSize <= 0) { - printf("%s size incorrect!", g_stConfInfo.vnodeWalPath); - exit(-1); - } else { - pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); - } - } - - if (0 == g_stConfInfo.simCase) { - pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed); } - taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, (double)walLogSize/(1024 * 1024.0)); + int64_t endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + + int32_t totalRows = 0; + if (1 == g_stConfInfo.ratio) { + totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables; + } else { + totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio); + } + + float seconds = delay / 1000000.0; + float rowsSpeed = totalRows / seconds; + float msgsSpeed = totalMsgs / seconds; + + if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) { + walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); + if (walLogSize <= 0) { + printf("%s size incorrect!", g_stConfInfo.vnodeWalPath); + exit(-1); + } else { + pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize / (1024 * 1024.0)); + } + } + + if (0 == g_stConfInfo.simCase) { + pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, + totalMsgs, seconds, rowsSpeed, msgsSpeed); + } + taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, + (double)walLogSize / (1024 * 1024.0)); } if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) { return 0; } - - tmq_t* tmq = build_consumer(); + + tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - if ((NULL == tmq) || (NULL == topic_list)){ + if ((NULL == tmq) || (NULL == topic_list)) { return -1; } - + perf_loop(tmq, topic_list, totalMsgs, walLogSize); taosMemoryFreeClear(g_pRowValue); - taosFprintfFile(g_fp, "\n"); - taosCloseFile(&g_fp); + taosFprintfFile(g_fp, "\n"); + taosCloseFile(&g_fp); return 0; } diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 7862788a713451bd9d6de362c5fd3cf34721be59..eaca8f151e9311fa113de29b6f1be838716f1239 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -35,8 +35,8 @@ #define MAX_ROW_STR_LEN (16 * 1024) typedef struct { - int32_t expectMsgCnt; - int32_t consumeMsgCnt; + int32_t expectMsgCnt; + int32_t consumeMsgCnt; TdThread thread; } SThreadInfo; @@ -45,12 +45,12 @@ typedef struct { char dbName[32]; char topicString[256]; char keyString[1024]; - char topicString1[256]; - char keyString1[1024]; + char topicString1[256]; + char keyString1[1024]; int32_t showMsgFlag; int32_t consumeDelay; // unit s int32_t consumeMsgCnt; - int32_t checkMode; + int32_t checkMode; // save result after parse agrvs int32_t numOfTopic; @@ -59,13 +59,13 @@ typedef struct { int32_t numOfKey; char key[32][64]; char value[32][64]; - - int32_t numOfTopic1; - char topics1[32][64]; - - int32_t numOfKey1; - char key1[32][64]; - char value1[32][64]; + + int32_t numOfTopic1; + char topics1[32][64]; + + int32_t numOfKey1; + char key1[32][64]; + char value1[32][64]; } SConfInfo; static SConfInfo g_stConfInfo; @@ -186,18 +186,18 @@ void parseInputString() { ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); g_stConfInfo.numOfTopic++; - + token = strtok(NULL, delim); - } + } token = strtok(g_stConfInfo.topicString1, delim); - while(token != NULL) { - //printf("%s\n", token ); - strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token); + while (token != NULL) { + // printf("%s\n", token ); + strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token); ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]); - //printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - g_stConfInfo.numOfTopic1++; - + // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + g_stConfInfo.numOfTopic1++; + token = strtok(NULL, delim); } @@ -214,28 +214,28 @@ void parseInputString() { // g_stConfInfo.value[g_stConfInfo.numOfKey]); g_stConfInfo.numOfKey++; } - + token = strtok(NULL, delim); } token = strtok(g_stConfInfo.keyString1, delim); - while(token != NULL) { - //printf("%s\n", token ); - { - char* pstr = token; - ltrim(pstr); - char *ret = strchr(pstr, ch); - memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret-pstr); - strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret+1); - //printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]); - g_stConfInfo.numOfKey1++; + while (token != NULL) { + // printf("%s\n", token ); + { + char* pstr = token; + ltrim(pstr); + char* ret = strchr(pstr, ch); + memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr); + strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1); + // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], + // g_stConfInfo.value[g_stConfInfo.numOfKey]); + g_stConfInfo.numOfKey1++; } - + token = strtok(NULL, delim); } } - static int running = 1; /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ @@ -253,6 +253,7 @@ int queryDB(TAOS* taos, char* command) { } tmq_t* build_consumer() { +#if 0 char sqlStr[1024] = {0}; TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -266,13 +267,19 @@ tmq_t* build_consumer() { exit(-1); } taos_free_result(pRes); +#endif tmq_conf_t* conf = tmq_conf_new(); // tmq_conf_set(conf, "group.id", "tg2"); for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); } - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); return tmq; } @@ -285,10 +292,10 @@ tmq_list_t* build_topic_list() { return topic_list; } - tmq_t* build_consumer_x() { +#if 0 char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -296,23 +303,29 @@ tmq_t* build_consumer_x() { TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("error in use db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - exit(-1); + taos_free_result(pRes); + exit(-1); } taos_free_result(pRes); +#endif tmq_conf_t* conf = tmq_conf_new(); - //tmq_conf_set(conf, "group.id", "tg2"); + // tmq_conf_set(conf, "group.id", "tg2"); for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) { tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]); } - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); return tmq; } tmq_list_t* build_topic_list_x() { tmq_list_t* topic_list = tmq_list_new(); - //tmq_list_append(topic_list, "test_stb_topic_1"); + // tmq_list_append(topic_list, "test_stb_topic_1"); for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) { tmq_list_append(topic_list, g_stConfInfo.topics1[i]); } @@ -367,9 +380,9 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) { if (tmqMsg) { totalMsgs++; - //printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); + // printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); - #if 0 +#if 0 TAOS_ROW row; while (NULL != (row = tmq_get_row(tmqMsg))) { totalRows++; @@ -396,65 +409,63 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) { exit(-1); } - //printf("%d", totalMsgs); // output to sim for check result + // printf("%d", totalMsgs); // output to sim for check result return totalMsgs; } - -void *threadFunc(void *param) { +void* threadFunc(void* param) { int32_t totalMsgs = 0; - SThreadInfo *pInfo = (SThreadInfo *)param; + SThreadInfo* pInfo = (SThreadInfo*)param; - tmq_t* tmq = build_consumer_x(); + tmq_t* tmq = build_consumer_x(); tmq_list_t* topic_list = build_topic_list_x(); - if ((NULL == tmq) || (NULL == topic_list)){ + if ((NULL == tmq) || (NULL == topic_list)) { return NULL; } - + tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); if (err) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - //if (0 == g_stConfInfo.consumeMsgCnt) { - // loop_consume(tmq); - //} else { - pInfo->consumeMsgCnt = parallel_consume(tmq, 1); + // if (0 == g_stConfInfo.consumeMsgCnt) { + // loop_consume(tmq); + // } else { + pInfo->consumeMsgCnt = parallel_consume(tmq, 1); //} err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - pInfo->consumeMsgCnt = -1; + pInfo->consumeMsgCnt = -1; return NULL; } return NULL; } - int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); parseInputString(); - int32_t numOfThreads = 1; + int32_t numOfThreads = 1; TdThreadAttr thattr; taosThreadAttrInit(&thattr); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); - SThreadInfo *pInfo = (SThreadInfo *)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); + SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); if (g_stConfInfo.numOfTopic1) { // pthread_create one thread to consume for (int32_t i = 0; i < numOfThreads; ++i) { pInfo[i].expectMsgCnt = 0; pInfo[i].consumeMsgCnt = 0; - taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); + taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i)); } } - int32_t totalMsgs = 0; + int32_t totalMsgs = 0; tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); if ((NULL == tmq) || (NULL == topic_list)) { @@ -479,48 +490,48 @@ int main(int32_t argc, char* argv[]) { exit(-1); } - if (g_stConfInfo.numOfTopic1) { + if (g_stConfInfo.numOfTopic1) { for (int32_t i = 0; i < numOfThreads; i++) { taosThreadJoin(pInfo[i].thread, NULL); } - //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); - if (0 == g_stConfInfo.checkMode) { - if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { - printf("success"); + // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + if (0 == g_stConfInfo.checkMode) { + if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { + printf("success"); } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (1 == g_stConfInfo.checkMode) { + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } else if (1 == g_stConfInfo.checkMode) { if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) { - printf("success"); + printf("success"); } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (2 == g_stConfInfo.checkMode) { - if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) { - printf("success"); + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } else if (2 == g_stConfInfo.checkMode) { + if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) { + printf("success"); } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (3 == g_stConfInfo.checkMode) { + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } else if (3 == g_stConfInfo.checkMode) { if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) { - printf("success"); + printf("success"); } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (4 == g_stConfInfo.checkMode) { - if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) - || ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) - || ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) - || ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) { - printf("success"); + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } else if (4 == g_stConfInfo.checkMode) { + if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) || + ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) || + ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) || + ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) { + printf("success"); } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else { - printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } else { + printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } } return 0; diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 8ef5c816c803d8c6b34b4c85f9244047dc1db20f..d71323336292cccf3d4cdb951c3ce457bded0667 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -678,7 +678,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { if (tt < 0) tt = 0; #endif - tp = localtime(&tt); + tp = taosLocalTime(&tt, NULL); strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp); if (precision == TSDB_TIME_PRECISION_MILLI) { sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000)); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index d8833626365b3a82dc2868b50e74e397992e06ec..ac2010efa3a759c79073aca3e935d278bd2ab31e 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -454,7 +454,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { } } - struct tm *ptm = localtime(&tt); + struct tm *ptm = taosLocalTime(&tt, NULL); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); if (precision == TSDB_TIME_PRECISION_NANO) { diff --git a/tools/taosTools-1.4.1-Linux-x64.tar.gz b/tools/taosTools-1.4.1-Linux-x64.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..caefa3d95768cc0551bdf2f9ba9b5631942b9e5a Binary files /dev/null and b/tools/taosTools-1.4.1-Linux-x64.tar.gz differ