提交 773a09e5 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

...@@ -141,7 +141,7 @@ int32_t create_topic() { ...@@ -141,7 +141,7 @@ int32_t create_topic() {
return 0; return 0;
} }
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
printf("commit %d\n", resp); printf("commit %d\n", resp);
} }
...@@ -163,7 +163,7 @@ tmq_t* build_consumer() { ...@@ -163,7 +163,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", "abc1"); tmq_conf_set(conf, "td.connect.db", "abc1");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
return tmq; return tmq;
} }
...@@ -189,7 +189,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -189,7 +189,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
cnt++; cnt++;
/*printf("get data\n");*/ /*printf("get data\n");*/
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); taos_free_result(tmqmessage);
/*} else {*/ /*} else {*/
/*break;*/ /*break;*/
} }
...@@ -219,7 +219,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -219,7 +219,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); taos_free_result(tmqmessage);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
} }
...@@ -249,7 +249,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -249,7 +249,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
batchCnt++; batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/ /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); taos_free_result(tmqmessage);
} else { } else {
break; break;
} }
......
...@@ -217,17 +217,17 @@ typedef struct tmq_conf_t tmq_conf_t; ...@@ -217,17 +217,17 @@ typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
// typedef struct tmq_message_t tmq_message_t; // typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
#if 1 #if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif #endif
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
...@@ -271,14 +271,19 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); ...@@ -271,14 +271,19 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO
#if 0
DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
#endif
#if 0 #if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic); DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic); DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
#endif
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res); DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
#endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0 #if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......
...@@ -331,6 +331,8 @@ typedef struct { ...@@ -331,6 +331,8 @@ typedef struct {
int32_t pid; int32_t pid;
char app[TSDB_APP_NAME_LEN]; char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
char user[TSDB_USER_LEN];
char passwd[TSDB_PASSWORD_LEN];
int64_t startTime; int64_t startTime;
} SConnectReq; } SConnectReq;
...@@ -482,7 +484,7 @@ typedef struct { ...@@ -482,7 +484,7 @@ typedef struct {
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision; int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
......
...@@ -30,6 +30,7 @@ typedef struct SPlanContext { ...@@ -30,6 +30,7 @@ typedef struct SPlanContext {
SNode* pAstRoot; SNode* pAstRoot;
bool topicQuery; bool topicQuery;
bool streamQuery; bool streamQuery;
bool rSmaQuery;
bool showRewrite; bool showRewrite;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
......
...@@ -30,6 +30,15 @@ bin_dir="/usr/local/taos/bin" ...@@ -30,6 +30,15 @@ bin_dir="/usr/local/taos/bin"
service_config_dir="/etc/systemd/system" service_config_dir="/etc/systemd/system"
#taos-tools para
demoName="taosdemo"
benchmarkName="taosBenchmark"
dumpName="taosdump"
emailName="taosdata.com"
taosName="taos"
toolsName="taostools"
# Color setting # Color setting
RED='\033[0;31m' RED='\033[0;31m'
GREEN='\033[1;32m' GREEN='\033[1;32m'
...@@ -230,8 +239,20 @@ function install_header() { ...@@ -230,8 +239,20 @@ function install_header() {
# temp install taosBenchmark # temp install taosBenchmark
function install_taosTools() { function install_taosTools() {
cd ${script_dir}/taos-tools/ ${csudo} rm -f ${bin_link_dir}/${benchmarkName} || :
tar xvf taosTools-1.4.1-Linux-x64.tar.gz && cd taosTools-1.4.1/ && ./install-taostools.sh ${csudo} rm -f ${bin_link_dir}/${dumpName} || :
${csudo} rm -f ${bin_link_dir}/rm${toolsName} || :
${csudo} /usr/bin/install -c -m 755 ${script_dir}/bin/${dumpName} ${install_main_dir}/bin/${dumpName}
${csudo} /usr/bin/install -c -m 755 ${script_dir}/bin/${benchmarkName} ${install_main_dir}/bin/${benchmarkName}
${csudo} ln -sf ${install_main_dir}/bin/${benchmarkName} ${install_main_dir}/bin/${demoName}
#Make link
[[ -x ${install_main_dir}/bin/${benchmarkName} ]] && \
${csudo} ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
[[ -x ${install_main_dir}/bin/${demoName} ]] && \
${csudo} ln -s ${install_main_dir}/bin/${demoName} ${bin_link_dir}/${demoName} || :
[[ -x ${install_main_dir}/bin/${dumpName} ]] && \
${csudo} ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
} }
function add_newHostname_to_hosts() { function add_newHostname_to_hosts() {
......
...@@ -39,7 +39,7 @@ cd ${compile_dir} ...@@ -39,7 +39,7 @@ cd ${compile_dir}
echo "compile_dir: ${compile_dir}" echo "compile_dir: ${compile_dir}"
cmake .. cmake .. -DBUILD_TOOLS=true
make -j32 make -j32
release_dir="${top_dir}/release" release_dir="${top_dir}/release"
...@@ -55,7 +55,6 @@ mkdir -p ${install_dir} ...@@ -55,7 +55,6 @@ mkdir -p ${install_dir}
mkdir -p ${install_dir}/bin mkdir -p ${install_dir}/bin
mkdir -p ${install_dir}/lib mkdir -p ${install_dir}/lib
mkdir -p ${install_dir}/inc mkdir -p ${install_dir}/inc
mkdir -p ${install_dir}/taos-tools
install_files="${script_dir}/install.sh" install_files="${script_dir}/install.sh"
chmod a+x ${script_dir}/install.sh || : chmod a+x ${script_dir}/install.sh || :
...@@ -64,13 +63,14 @@ cp ${install_files} ${install_dir} ...@@ -64,13 +63,14 @@ cp ${install_files} ${install_dir}
header_files="${top_dir}/include/client/taos.h ${top_dir}/include/util/taoserror.h" header_files="${top_dir}/include/client/taos.h ${top_dir}/include/util/taoserror.h"
cp ${header_files} ${install_dir}/inc cp ${header_files} ${install_dir}/inc
bin_files="${compile_dir}/build/bin/taosd ${compile_dir}/build/bin/taos ${compile_dir}/build/bin/create_table ${compile_dir}/build/bin/tmq_sim ${script_dir}/remove.sh" bin_files="${compile_dir}/source/dnode/mgmt/taosd ${compile_dir}/tools/shell/taos ${compile_dir}/tests/test/c/create_table ${compile_dir}/tests/test/c/tmq_sim ${script_dir}/remove.sh ${compile_dir}/build/bin/taosBenchmark ${compile_dir}/build/bin/taosdump"
cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
cp -rf ${compile_dir}/source/client/libtaos.so ${install_dir}/lib/
cp -rf ${compile_dir}/source/libs/tdb/libtdb.so ${install_dir}/lib/
cp -rf ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries"
cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory"
cp ${compile_dir}/build/lib/libtaos.so ${install_dir}/lib/
cp ${compile_dir}/build/lib/libtdb.so ${install_dir}/lib/
taostoolfile="${top_dir}/tools/taosTools-1.4.1-Linux-x64.tar.gz"
cp ${taostoolfile} ${install_dir}/taos-tools
#cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/ #cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/ #cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/
......
...@@ -306,7 +306,6 @@ void hbMgrInitMqHbRspHandle(); ...@@ -306,7 +306,6 @@ void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -146,7 +146,8 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* ...@@ -146,7 +146,8 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, sizeof((*pRequest)->self))) { if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) {
destroyRequest(*pRequest); destroyRequest(*pRequest);
*pRequest = NULL; *pRequest = NULL;
tscError("put request to request hash failed"); tscError("put request to request hash failed");
...@@ -263,7 +264,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t ...@@ -263,7 +264,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
} }
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO && precision != TSDB_TIME_PRECISION_NANO) { if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
precision != TSDB_TIME_PRECISION_NANO) {
return; return;
} }
...@@ -275,7 +277,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -275,7 +277,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res); pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) { if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob); schedulerFreeJob(pRequest->body.queryJob);
...@@ -300,7 +302,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -300,7 +302,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
} }
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) { SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
switch (pQuery->execMode) { switch (pQuery->execMode) {
...@@ -328,7 +330,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code ...@@ -328,7 +330,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
if (!keepQuery) { if (!keepQuery) {
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
} }
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
} }
...@@ -336,7 +338,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code ...@@ -336,7 +338,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
return pRequest; return pRequest;
} }
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
...@@ -522,6 +523,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { ...@@ -522,6 +523,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
connectReq.pid = htonl(appInfo.pid); connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime); connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = taosMemoryMalloc(contLen); void* pReq = taosMemoryMalloc(contLen);
...@@ -726,7 +729,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -726,7 +729,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p)); int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
ASSERT(len <= bytes); ASSERT(len <= bytes);
ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i])); ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
varDataSetLen(p, len); varDataSetLen(p, len);
pCol->offset[j] = (p - pResultInfo->convertBuf[i]); pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
p += (len + VARSTR_HEADER_SIZE); p += (len + VARSTR_HEADER_SIZE);
...@@ -744,51 +747,55 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -744,51 +747,55 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
} }
pResultInfo->convertBuf[i] = p; pResultInfo->convertBuf[i] = p;
int32_t len = 0; int32_t len = 0;
SResultColumn* pCol = &pResultInfo->pCol[i]; SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t j = 0; j < numOfRows; ++j) {
if (pCol->offset[j] != -1) { if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData; char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart; int32_t jsonInnerType = *pStart;
char *jsonInnerData = pStart + CHAR_BYTES; char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0}; char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
if(jsonInnerType == TSDB_DATA_TYPE_NULL){ if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
}else if(jsonInnerType == TSDB_DATA_TYPE_JSON){ } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst)); int32_t length =
taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
if (length <= 0) { if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, varDataVal(jsonInnerData)); tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0; length = 0;
} }
varDataSetLen(dst, length); varDataSetLen(dst, length);
}else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"'; *(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst) + CHAR_BYTES); int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
varDataVal(dst) + CHAR_BYTES);
if (length <= 0) { if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, varDataVal(jsonInnerData)); tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0; length = 0;
} }
varDataSetLen(dst, length + CHAR_BYTES*2); varDataSetLen(dst, length + CHAR_BYTES * 2);
*(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"'; *(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"';
}else if(jsonInnerType == TSDB_DATA_TYPE_DOUBLE){ } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(jsonInnerData); double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd); sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
}else if(jsonInnerType == TSDB_DATA_TYPE_BIGINT){ } else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData); int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd); sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
}else if(jsonInnerType == TSDB_DATA_TYPE_BOOL){ } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char *)jsonInnerData) == 1) ? "true" : "false"); sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
}else { } else {
ASSERT(0); ASSERT(0);
} }
if(len + varDataTLen(dst) > colLength[i]){ if (len + varDataTLen(dst) > colLength[i]) {
p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst)); p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
#if 0
struct tmq_message_t { struct tmq_message_t {
SMqPollRsp msg; SMqPollRsp msg;
char* topic; char* topic;
...@@ -31,6 +32,7 @@ struct tmq_message_t { ...@@ -31,6 +32,7 @@ struct tmq_message_t {
int32_t vgId; int32_t vgId;
int32_t resIter; int32_t resIter;
}; };
#endif
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
...@@ -52,9 +54,7 @@ struct tmq_topic_vgroup_t { ...@@ -52,9 +54,7 @@ struct tmq_topic_vgroup_t {
}; };
struct tmq_topic_vgroup_list_t { struct tmq_topic_vgroup_list_t {
int32_t cnt; SArray container; // SArray<tmq_topic_vgroup_t*>
int32_t size;
tmq_topic_vgroup_t* elems;
}; };
struct tmq_conf_t { struct tmq_conf_t {
...@@ -63,6 +63,7 @@ struct tmq_conf_t { ...@@ -63,6 +63,7 @@ struct tmq_conf_t {
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
uint16_t port; uint16_t port;
uint16_t autoCommitInterval;
char* ip; char* ip;
char* user; char* user;
char* pass; char* pass;
...@@ -202,6 +203,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -202,6 +203,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
} }
if (strcmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "auto.offset.reset") == 0) { if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) { if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE; conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
...@@ -300,7 +306,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -300,7 +306,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->tmq->commit_cb) { if (pParam->tmq->commit_cb) {
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL); pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL);
} }
if (!pParam->async) tsem_post(&pParam->rspSem); if (!pParam->async) tsem_post(&pParam->rspSem);
return 0; return 0;
...@@ -322,6 +328,7 @@ tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { ...@@ -322,6 +328,7 @@ tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
return tmq_subscribe(tmq, lst); return tmq_subscribe(tmq, lst);
} }
#if 0
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1); tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
if (pTmq == NULL) { if (pTmq == NULL) {
...@@ -357,8 +364,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -357,8 +364,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq; return pTmq;
} }
#endif
tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) { if (pTmq == NULL) {
return NULL; return NULL;
...@@ -369,6 +377,7 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -369,6 +377,7 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
ASSERT(user); ASSERT(user);
ASSERT(pass); ASSERT(pass);
ASSERT(conf->db); ASSERT(conf->db);
ASSERT(conf->groupId[0]);
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL; if (pTmq->pTscObj == NULL) return NULL;
...@@ -429,8 +438,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -429,8 +438,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.num = pArray->size; req.num = pArray->size;
req.offsets = pArray->pData; req.offsets = pArray->pData;
} else { } else {
req.num = offsets->cnt; req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->elems; req.offsets = (SMqOffset*)offsets->container.pData;
} }
SCoder encoder; SCoder encoder;
...@@ -1538,16 +1547,6 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -1538,16 +1547,6 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
} }
#endif #endif
#if 0
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
SMqPollRsp* pRsp = &tmq_message->msg;
tDeleteSMqConsumeRsp(pRsp);
/*taosMemoryFree(tmq_message);*/
taosFreeQitem(tmq_message);
}
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
const char* tmq_err2str(tmq_resp_err_t err) { const char* tmq_err2str(tmq_resp_err_t err) {
......
...@@ -108,7 +108,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) { ...@@ -108,7 +108,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
while (1) { while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg); taos_free_result(msg);
//printf("get msg\n"); //printf("get msg\n");
//if (msg == NULL) break; //if (msg == NULL) break;
} }
...@@ -141,7 +141,7 @@ TEST(testCase, tmq_subscribe_stb_Test) { ...@@ -141,7 +141,7 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_commit(tmq, NULL, 0); tmq_commit(tmq, NULL, 0);
} }
//tmq_commit(tmq, NULL, 0); //tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg); taos_free_result(msg);
//printf("get msg\n"); //printf("get msg\n");
} }
} }
......
...@@ -2756,6 +2756,8 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { ...@@ -2756,6 +2756,8 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1; if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1; if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->passwd) < 0) return -1;
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1; if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2773,6 +2775,8 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { ...@@ -2773,6 +2775,8 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->passwd) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1; if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
...@@ -3032,7 +3036,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq ...@@ -3032,7 +3036,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
return 0; return 0;
} }
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
...@@ -3052,7 +3055,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq ...@@ -3052,7 +3055,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -3085,7 +3088,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR ...@@ -3085,7 +3088,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
return 0; return 0;
} }
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
...@@ -119,10 +119,10 @@ _OVER: ...@@ -119,10 +119,10 @@ _OVER:
} }
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans * pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u; bool isReq = msgType & 1u;
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pNdWrapper; SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
if (msgType == TDMT_DND_SERVER_STATUS) { if (msgType == TDMT_DND_SERVER_STATUS) {
...@@ -443,7 +443,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s ...@@ -443,7 +443,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen); void * pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
...@@ -523,4 +523,4 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { ...@@ -523,4 +523,4 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.pWrapper = pWrapper, .pWrapper = pWrapper,
}; };
return msgCb; return msgCb;
} }
\ No newline at end of file
...@@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -24,20 +24,20 @@ ...@@ -24,20 +24,20 @@
#include "version.h" #include "version.h"
typedef struct { typedef struct {
uint32_t id; uint32_t id;
int8_t connType; int8_t connType;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time int64_t appStartTimeMs; // app start time
int32_t pid; // pid of app that invokes taosc int32_t pid; // pid of app that invokes taosc
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int8_t killed; int8_t killed;
int64_t loginTimeMs; int64_t loginTimeMs;
int64_t lastAccessTimeMs; int64_t lastAccessTimeMs;
uint64_t killId; uint64_t killId;
int32_t numOfQueries; int32_t numOfQueries;
SArray *pQueries; //SArray<SQueryDesc> SArray * pQueries; // SArray<SQueryDesc>
} SConnObj; } SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
...@@ -45,7 +45,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType ...@@ -45,7 +45,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
static void mndFreeConn(SConnObj *pConn); static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId); static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void * mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq); static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq);
static int32_t mndProcessConnectReq(SNodeMsg *pReq); static int32_t mndProcessConnectReq(SNodeMsg *pReq);
...@@ -71,9 +71,9 @@ int32_t mndInitProfile(SMnode *pMnode) { ...@@ -71,9 +71,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
return 0; return 0;
...@@ -91,7 +91,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType ...@@ -91,7 +91,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
int32_t pid, const char *app, int64_t startTime) { int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
char connStr[255] = {0}; char connStr[255] = {0};
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
int32_t connId = mndGenerateUid(connStr, len); int32_t connId = mndGenerateUid(connStr, len);
if (startTime == 0) startTime = taosGetTimestampMs(); if (startTime == 0) startTime = taosGetTimestampMs();
...@@ -174,10 +174,10 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { ...@@ -174,10 +174,10 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
} }
static int32_t mndProcessConnectReq(SNodeMsg *pReq) { static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode * pMnode = pReq->pNode;
SUserObj *pUser = NULL; SUserObj * pUser = NULL;
SDbObj *pDb = NULL; SDbObj * pDb = NULL;
SConnObj *pConn = NULL; SConnObj * pConn = NULL;
int32_t code = -1; int32_t code = -1;
SConnectReq connReq = {0}; SConnectReq connReq = {0};
char ip[30] = {0}; char ip[30] = {0};
...@@ -194,6 +194,11 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -194,6 +194,11 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr()); mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
mError("user:%s, failed to auth while acquire user\n %s \r\n %s", pReq->user, connReq.passwd, pUser->pass);
code = TSDB_CODE_RPC_AUTH_FAILURE;
goto CONN_OVER;
}
if (connReq.db[0]) { if (connReq.db[0]) {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
...@@ -253,8 +258,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { ...@@ -253,8 +258,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->pQueries = pBasic->queryDesc; pConn->pQueries = pBasic->queryDesc;
pBasic->queryDesc = NULL; pBasic->queryDesc = NULL;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -324,9 +329,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { ...@@ -324,9 +329,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
return NULL; return NULL;
} }
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) { static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
if (pHbReq->query) { if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query; SQueryHbReqBasic *pBasic = pHbReq->query;
...@@ -335,8 +341,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -335,8 +341,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
rpcGetConnInfo(pMsg->handle, &connInfo); rpcGetConnInfo(pMsg->handle, &connInfo);
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0); pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
pBasic->pid, pBasic->app, 0);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1; return -1;
...@@ -345,7 +352,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -345,7 +352,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
} else if (pConn->killed) { } else if (pConn->killed) {
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id); mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_MND_INVALID_CONNECTION; terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1; return -1;
} }
...@@ -369,8 +376,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -369,8 +376,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
rspBasic->connId = pConn->id; rspBasic->connId = pConn->id;
rspBasic->totalDnodes = 1; //TODO rspBasic->totalDnodes = 1; // TODO
rspBasic->onlineDnodes = 1; //TODO rspBasic->onlineDnodes = 1; // TODO
mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
...@@ -379,7 +386,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -379,7 +386,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
int32_t kvNum = taosHashGetSize(pHbReq->info); int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) { if (NULL == pHbReq->info || kvNum <= 0) {
taosArrayPush(pBatchRsp->rsps, &hbRsp); taosArrayPush(pBatchRsp->rsps, &hbRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -396,7 +403,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -396,7 +403,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
switch (kv->key) { switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: { case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL; void * rspMsg = NULL;
int32_t rspLen = 0; int32_t rspLen = 0;
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) { if (rspMsg && rspLen > 0) {
...@@ -406,7 +413,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -406,7 +413,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
break; break;
} }
case HEARTBEAT_KEY_STBINFO: { case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL; void * rspMsg = NULL;
int32_t rspLen = 0; int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) { if (rspMsg && rspLen > 0) {
...@@ -457,7 +464,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { ...@@ -457,7 +464,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq); taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
void *buf = rpcMallocCont(tlen); void * buf = rpcMallocCont(tlen);
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp); tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
...@@ -479,7 +486,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { ...@@ -479,7 +486,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
} }
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode * pMnode = pReq->pNode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
...@@ -513,7 +520,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { ...@@ -513,7 +520,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
} }
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) { static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode * pMnode = pReq->pNode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
...@@ -545,11 +552,11 @@ static int32_t mndProcessKillConnReq(SNodeMsg *pReq) { ...@@ -545,11 +552,11 @@ static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
} }
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode * pMnode = pReq->pNode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char * pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6]; char ipStr[TSDB_IPv4ADDR_LEN + 6];
if (pShow->pIter == NULL) { if (pShow->pIter == NULL) {
...@@ -604,8 +611,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int ...@@ -604,8 +611,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int
} }
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
#if 0 #if 0
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t cols = 0; int32_t cols = 0;
...@@ -703,7 +710,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i ...@@ -703,7 +710,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
#endif #endif
return numOfRows; return numOfRows;
} }
......
...@@ -34,6 +34,54 @@ ...@@ -34,6 +34,54 @@
extern bool tsStreamSchedV; extern bool tsStreamSchedV;
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) {
SNode* pAst = NULL;
SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS;
if (nodesStringToNode(ast, &pAst) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.rSmaQuery = true,
.triggerType = triggerType,
.watermark = watermark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (qSubPlanToString(plan, pStr, pLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
END:
if (pAst) nodesDestroyNode(pAst);
if (pPlan) nodesDestroyNode(pPlan);
return terrno;
}
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
......
...@@ -30,8 +30,15 @@ int32_t MndTestProfile::connId; ...@@ -30,8 +30,15 @@ int32_t MndTestProfile::connId;
TEST_F(MndTestProfile, 01_ConnectMsg) { TEST_F(MndTestProfile, 01_ConnectMsg) {
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
connectReq.pid = 1234; connectReq.pid = 1234;
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
strcpy(connectReq.app, "mnode_test_profile"); strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, ""); strcpy(connectReq.db, "");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
...@@ -58,10 +65,16 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { ...@@ -58,10 +65,16 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
} }
TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
connectReq.pid = 1234; connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile"); strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db"); strcpy(connectReq.db, "invalid_db");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
......
...@@ -54,10 +54,16 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { ...@@ -54,10 +54,16 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
} }
TEST_F(MndTestShow, 03_ShowMsg_Conn) { TEST_F(MndTestShow, 03_ShowMsg_Conn) {
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
connectReq.pid = 1234; connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_show"); strcpy(connectReq.app, "mnode_test_show");
strcpy(connectReq.db, ""); strcpy(connectReq.db, "");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
...@@ -74,4 +80,4 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) { ...@@ -74,4 +80,4 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
TEST_F(MndTestShow, 04_ShowMsg_Cluster) { TEST_F(MndTestShow, 04_ShowMsg_Cluster) {
test.SendShowReq(TSDB_MGMT_TABLE_CLUSTER, "cluster", ""); test.SendShowReq(TSDB_MGMT_TABLE_CLUSTER, "cluster", "");
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
} }
\ No newline at end of file
...@@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
// need to reposition // need to reposition
......
...@@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false; return false;
} }
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) {
// currently only rows are used
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
/*int32_t sversion = pHandle->pBlock->sversion;*/ /*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion // TODO set to real sversion
int32_t sversion = 0; int32_t sversion = 0;
...@@ -112,7 +103,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -112,7 +103,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
STSchema* pTschema = pHandle->pSchema; STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows; *pNumOfRows = pHandle->pBlock->numOfRows;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/ /*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
...@@ -120,10 +111,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -120,10 +111,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNumNeed = pSchemaWrapper->nCols; colNumNeed = pSchemaWrapper->nCols;
} }
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); *ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) { if (*ppCols == NULL) {
return NULL; return -1;
} }
int32_t colMeta = 0; int32_t colMeta = 0;
int32_t colNeed = 0; int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) { while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
...@@ -136,21 +128,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -136,21 +128,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNeed++; colNeed++;
} else { } else {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
/*int sz = numOfRows * pColSchema->bytes;*/
colInfo.info.bytes = pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId; colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type; colInfo.info.type = pColSchema->type;
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) { if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); goto FAIL;
return NULL;
} }
taosArrayPush(pArray, &colInfo); taosArrayPush(*ppCols, &colInfo);
colMeta++; colMeta++;
colNeed++; colNeed++;
} }
} }
int32_t colActual = taosArrayGetSize(*ppCols);
// TODO in stream shuffle case, fetch groupId
*pGroupId = 0;
STSRowIter iter = {0}; STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
STSRow* row; STSRow* row;
...@@ -159,22 +154,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -159,22 +154,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
// get all wanted col of that block // get all wanted col of that block
int32_t colTot = taosArrayGetSize(pArray); for (int32_t i = 0; i < colActual; i++) {
for (int32_t i = 0; i < colTot; i++) { SColumnInfoData* pColData = taosArrayGet(*ppCols, i);
SColumnInfoData* pColData = taosArrayGet(pArray, i);
SCellVal sVal = {0}; SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break; break;
} }
/*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); goto FAIL;
return NULL;
} }
} }
curRow++; curRow++;
} }
return pArray; return 0;
FAIL:
taosArrayDestroy(*ppCols);
return -1;
} }
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
......
...@@ -716,7 +716,8 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp ...@@ -716,7 +716,8 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false"); int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
varDataSetLen(output, len); varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY) { } else if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), varDataVal(input)); int32_t len = MIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE);
len = sprintf(varDataVal(output), "%.*s", len, varDataVal(input));
varDataSetLen(output, len); varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) { } else if (inputType == TSDB_DATA_TYPE_NCHAR) {
//not support //not support
......
...@@ -158,6 +158,7 @@ typedef struct { ...@@ -158,6 +158,7 @@ typedef struct {
char secured : 2; char secured : 2;
char spi : 2; char spi : 2;
char user[TSDB_UNI_LEN];
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later uint32_t code; // del later
uint32_t msgType; uint32_t msgType;
...@@ -186,23 +187,23 @@ typedef enum { Normal, Quit, Release, Register } STransMsgType; ...@@ -186,23 +187,23 @@ typedef enum { Normal, Quit, Release, Register } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U) #define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) #define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) #define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); #define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U) #define transIsReq(type) (type & 1U)
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
......
...@@ -614,35 +614,16 @@ void cliSend(SCliConn* pConn) { ...@@ -614,35 +614,16 @@ void cliSend(SCliConn* pConn) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
if (!pConn->secured) { STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
char* buf = taosMemoryCalloc(1, msgLen + sizeof(STransUserMsg)); pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
memcpy(buf, (char*)pHead, msgLen);
STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
memcpy(uMsg->user, pTransInst->user, tListLen(uMsg->user));
memcpy(uMsg->secret, pTransInst->secret, tListLen(uMsg->secret));
// to avoid mem leak
destroyUserdata(pMsg);
pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);
pHead = (STransMsgHead*)buf;
pHead->secured = 1;
msgLen += sizeof(STransUserMsg);
}
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
......
...@@ -46,7 +46,6 @@ typedef struct SSrvConn { ...@@ -46,7 +46,6 @@ typedef struct SSrvConn {
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
char secured;
int spi; int spi;
char info[64]; char info[64];
char user[TSDB_UNI_LEN]; // user ID for the link char user[TSDB_UNI_LEN]; // user ID for the link
...@@ -104,6 +103,13 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) ...@@ -104,6 +103,13 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvShutDownCb(uv_shutdown_t* req, int status);
/*
* time-consuming task throwed into BG work thread
*/
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle); static void uvFreeCb(uv_handle_t* handle);
...@@ -181,16 +187,16 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -181,16 +187,16 @@ static void uvHandleReq(SSrvConn* pConn) {
uint32_t msgLen = pBuf->len; uint32_t msgLen = pBuf->len;
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
if (pHead->secured == 1) {
STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
}
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
if (pHead->secured == 1) { memcpy(pConn->user, pHead->user, strlen(pHead->user));
pHead->msgLen -= sizeof(STransUserMsg);
} // TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
CONN_SHOULD_RELEASE(pConn, pHead); CONN_SHOULD_RELEASE(pConn, pHead);
...@@ -344,12 +350,6 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -344,12 +350,6 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->ahandle; pHead->ahandle = (uint64_t)pMsg->ahandle;
// pHead->secured = pMsg->code == 0 ? 1 : 0; //
if (!pConn->secured) {
pConn->secured = pMsg->code == 0 ? 1 : 0;
}
pHead->secured = pConn->secured;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1; pHead->msgType = pConn->inType + 1;
} else { } else {
...@@ -464,6 +464,24 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { ...@@ -464,6 +464,24 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
taosMemoryFree(req); taosMemoryFree(req);
} }
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
// only auth conn currently, add more func later
tTrace("server conn %p start to be processed in BG Thread", req->data);
return;
}
static void uvWorkAfterTask(uv_work_t* req, int status) {
if (status != 0) {
tTrace("server conn %p failed to processed ", req->data);
}
// Done time-consumeing task
// add more func later
// this func called in main loop
tTrace("server conn %p already processed ", req->data);
taosMemoryFree(req);
}
void uvOnAcceptCb(uv_stream_t* stream, int status) { void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) { if (status == -1) {
return; return;
......
!/bin/bash #!/bin/bash
################################################## ##################################################
# #
......
...@@ -340,7 +340,7 @@ tmq_t* build_consumer() { ...@@ -340,7 +340,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq); assert(tmq);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
return tmq; return tmq;
...@@ -367,7 +367,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -367,7 +367,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) { if (tmqmessage) {
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage); taos_free_result(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
} }
...@@ -400,7 +400,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog ...@@ -400,7 +400,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
} }
tmq_message_destroy(tmqmessage); taos_free_result(tmqmessage);
} else { } else {
break; break;
} }
......
...@@ -31,46 +31,46 @@ ...@@ -31,46 +31,46 @@
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int32_t checkresult;
char topicString[1024]; int64_t consumeMsgCnt;
char keyString[1024]; int32_t checkresult;
int32_t numOfTopic; char topicString[1024];
char topics[32][64]; char keyString[1024];
int32_t numOfKey; int32_t numOfTopic;
char key[32][64]; char topics[32][64];
char value[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq; tmq_t* tmq;
tmq_list_t* topicList; tmq_list_t* topicList;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
// input from argvs // input from argvs
char dbName[32]; char dbName[32];
int32_t showMsgFlag; int32_t showMsgFlag;
int32_t consumeDelay; // unit s int32_t consumeDelay; // unit s
int32_t numOfThread; int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL; TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
...@@ -95,7 +95,7 @@ void initLogFile() { ...@@ -95,7 +95,7 @@ void initLogFile() {
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) { if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit -1; exit - 1;
}; };
g_fp = pFile; g_fp = pFile;
...@@ -103,27 +103,27 @@ void initLogFile() { ...@@ -103,27 +103,27 @@ void initLogFile() {
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(pFile, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir); taosFprintfFile(pFile, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, " Topics: "); taosFprintfFile(pFile, " Topics: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { for (int i = 0; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]); taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(pFile, "\n");
taosFprintfFile(pFile, " Key: "); taosFprintfFile(pFile, " Key: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) { for (int i = 0; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(pFile, "\n");
} }
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(pFile, "###################################################################\n");
} }
...@@ -180,23 +180,23 @@ void ltrim(char* str) { ...@@ -180,23 +180,23 @@ void ltrim(char* str) {
// return str; // return str;
} }
static int running = 1; static int running = 1;
static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) { static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) {
char buf[1024]; char buf[1024];
//printf("topic: %s\n", tmq_get_topic_name(msg)); // printf("topic: %s\n", tmq_get_topic_name(msg));
//printf("vg:%d\n", tmq_get_vgroup_id(msg)); // printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable); taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
//taos_print_row(buf, row, fields, numOfFields); // taos_print_row(buf, row, fields, numOfFields);
//printf("%s\n", buf); // printf("%s\n", buf);
//taosFprintfFile(g_fp, "%s\n", buf); // taosFprintfFile(g_fp, "%s\n", buf);
} }
} }
...@@ -213,7 +213,7 @@ int queryDB(TAOS* taos, char* command) { ...@@ -213,7 +213,7 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
void build_consumer(SThreadInfo *pInfo) { void build_consumer(SThreadInfo* pInfo) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
...@@ -233,11 +233,11 @@ void build_consumer(SThreadInfo *pInfo) { ...@@ -233,11 +233,11 @@ void build_consumer(SThreadInfo *pInfo) {
for (int32_t i = 0; i < pInfo->numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
pInfo->tmq = tmq_consumer_new(pConn, conf, NULL, 0); pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
return; return;
} }
void build_topic_list(SThreadInfo *pInfo) { void build_topic_list(SThreadInfo* pInfo) {
pInfo->topicList = tmq_list_new(); pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < pInfo->numOfTopic; i++) { for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
...@@ -246,48 +246,45 @@ void build_topic_list(SThreadInfo *pInfo) { ...@@ -246,48 +246,45 @@ void build_topic_list(SThreadInfo *pInfo) {
return; return;
} }
int32_t saveConsumeResult(SThreadInfo *pInfo) { int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", g_stConfInfo.dbName,
g_stConfInfo.dbName, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->checkresult);
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
return 0; return 0;
} }
void loop_consume(SThreadInfo *pInfo) { void loop_consume(SThreadInfo* pInfo) {
tmq_resp_err_t err; tmq_resp_err_t err;
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
//int64_t totalRows = 0; // int64_t totalRows = 0;
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg, totalMsgs, 0); msg_process(tmqMsg, totalMsgs, 0);
} }
tmq_message_destroy(tmqMsg); taos_free_result(tmqMsg);
totalMsgs++; totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) { if (totalMsgs >= pInfo->expectMsgCnt) {
break; break;
} }
...@@ -295,7 +292,7 @@ void loop_consume(SThreadInfo *pInfo) { ...@@ -295,7 +292,7 @@ void loop_consume(SThreadInfo *pInfo) {
break; break;
} }
} }
err = tmq_consumer_close(pInfo->tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
...@@ -303,35 +300,34 @@ void loop_consume(SThreadInfo *pInfo) { ...@@ -303,35 +300,34 @@ void loop_consume(SThreadInfo *pInfo) {
} }
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
} }
void *consumeThreadFunc(void *param) { void* consumeThreadFunc(void* param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo *pInfo = (SThreadInfo *)param; SThreadInfo* pInfo = (SThreadInfo*)param;
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){ if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
loop_consume(pInfo); loop_consume(pInfo);
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
// save consume result into consumeresult table // save consume result into consumeresult table
saveConsumeResult(pInfo); saveConsumeResult(pInfo);
...@@ -343,7 +339,7 @@ void parseConsumeInfo() { ...@@ -343,7 +339,7 @@ void parseConsumeInfo() {
const char delim[2] = ","; const char delim[2] = ",";
const char ch = ':'; const char ch = ':';
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim); token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -351,10 +347,10 @@ void parseConsumeInfo() { ...@@ -351,10 +347,10 @@ void parseConsumeInfo() {
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++; g_stConfInfo.stThreads[i].numOfTopic++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.stThreads[i].keyString, delim); token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -368,7 +364,7 @@ void parseConsumeInfo() { ...@@ -368,7 +364,7 @@ void parseConsumeInfo() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++; g_stConfInfo.stThreads[i].numOfKey++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
} }
...@@ -376,46 +372,47 @@ void parseConsumeInfo() { ...@@ -376,46 +372,47 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() { int32_t getConsumeInfo() {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName); sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes); int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
// ifcheckdata int
int32_t numOfThread = 0; int32_t numOfThread = 0;
while ((row = taos_fetch_row(pRes))) { while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes); int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) { if (row[i] == NULL || 0 == i) {
continue; continue;
} }
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]); g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]); g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]); g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
} }
} }
numOfThread ++; numOfThread++;
} }
g_stConfInfo.numOfThread = numOfThread; g_stConfInfo.numOfThread = numOfThread;
...@@ -426,7 +423,6 @@ int32_t getConsumeInfo() { ...@@ -426,7 +423,6 @@ int32_t getConsumeInfo() {
return 0; return 0;
} }
int main(int32_t argc, char* argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
getConsumeInfo(); getConsumeInfo();
...@@ -438,18 +434,19 @@ int main(int32_t argc, char* argv[]) { ...@@ -438,18 +434,19 @@ int main(int32_t argc, char* argv[]) {
// pthread_create one thread to consume // pthread_create one thread to consume
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i]))); taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
(void*)(&(g_stConfInfo.stThreads[i])));
} }
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
} }
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册