提交 eb1b3c49 编写于 作者: S slzhou

Merge branch '3.0' into 3.0_udfd

......@@ -88,6 +88,12 @@ def pre_test(){
cmake .. > /dev/null
make -j4> /dev/null
'''
sh'''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
return 1
}
......@@ -97,6 +103,7 @@ pipeline {
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
}
stages {
stage('pre_build'){
......@@ -117,6 +124,11 @@ pipeline {
./test-all.sh b1fq
'''
sh'''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
sh'''
cd ${WKC}/debug
ctest
'''
......
......@@ -141,7 +141,7 @@ int32_t create_topic() {
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
printf("commit %d\n", resp);
}
......@@ -163,7 +163,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", "abc1");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
return tmq;
}
......@@ -189,7 +189,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
cnt++;
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
}
......@@ -219,7 +219,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
......@@ -249,7 +249,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
} else {
break;
}
......
......@@ -92,38 +92,14 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_BIND {
int buffer_type;
void *buffer;
uintptr_t buffer_length; // unused
uintptr_t *length;
int *is_null;
int is_unsigned; // unused
int *error; // unused
union {
int64_t ts;
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
unsigned char *bin;
char *nchar;
} u;
unsigned int allocated;
} TAOS_BIND;
typedef struct TAOS_MULTI_BIND {
typedef struct TAOS_BIND_v2 {
int buffer_type;
void *buffer;
uintptr_t buffer_length;
int32_t buffer_length;
int32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
} TAOS_BIND_v2;
typedef enum {
SET_CONF_RET_SUCC = 0,
......@@ -152,34 +128,34 @@ DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
......@@ -241,17 +217,17 @@ typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
// typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
#if 1
#if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
#endif
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
......@@ -295,14 +271,19 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO
#if 0
DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
#endif
#if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
#endif
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
#endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......
......@@ -331,6 +331,8 @@ typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
char user[TSDB_USER_LEN];
char passwd[TSDB_PASSWORD_LEN];
int64_t startTime;
} SConnectReq;
......@@ -482,7 +484,7 @@ typedef struct {
char intervalUnit;
char slidingUnit;
char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision;
int64_t interval;
int64_t sliding;
......
......@@ -684,6 +684,43 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
return TSDB_CODE_SUCCESS;
}
/**
* @brief The invoker is responsible for memory alloc/dealloc.
*
* @param pBuilder
* @param pBuf Output buffer of STSRow
*/
static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
pBuilder->pBuf = (STSRow *)pBuf;
if (!pBuilder->pBuf) {
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
TASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0);
uint32_t len = 0;
switch (pBuilder->rowType) {
case TD_ROW_TP:
#ifdef TD_SUPPORT_BITMAP
pBuilder->pBitmap = tdGetBitmapAddrTp(pBuilder->pBuf, pBuilder->flen);
#endif
break;
case TD_ROW_KV:
#ifdef TD_SUPPORT_BITMAP
pBuilder->pBitmap = tdGetBitmapAddrKv(pBuilder->pBuf, pBuilder->nBoundCols);
#endif
break;
default:
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
*
......
......@@ -283,12 +283,12 @@ typedef struct SVgDataBlocks {
} SVgDataBlocks;
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SVnodeModifOpStmt;
typedef struct SExplainOptions {
......
......@@ -21,6 +21,15 @@ extern "C" {
#endif
#include "querynodes.h"
#include "query.h"
typedef struct SStmtCallback {
TAOS_STMT* pStmt;
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*);
int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*);
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
} SStmtCallback;
typedef struct SParseContext {
uint64_t requestId;
......@@ -34,6 +43,7 @@ typedef struct SParseContext {
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog;
SStmtCallback *pStmtCb;
} SParseContext;
typedef struct SCmdMsgInfo {
......@@ -66,11 +76,27 @@ typedef struct SQuery {
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
bool isInsertSql(const char* pStr, size_t length);
void qDestroyQuery(SQuery* pQueryNode);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
void qFreeStmtDataBlock(void* pDataBlock);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
void qDestroyStmtDataBlock(void* pBlock);
int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum);
int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields);
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen);
#ifdef __cplusplus
}
#endif
......
......@@ -30,6 +30,7 @@ typedef struct SPlanContext {
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
int8_t triggerType;
int64_t watermark;
......@@ -45,7 +46,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
typedef TAOS_MULTI_BIND TAOS_BIND_v2; // todo remove
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_BIND_v2* pParams);
// Convert to subplan to string for the scheduler to send to the executor
......
......@@ -132,6 +132,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222)
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223)
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224)
#define TSDB_CODE_TSC_STMT_API_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
// mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
......
......@@ -30,6 +30,15 @@ bin_dir="/usr/local/taos/bin"
service_config_dir="/etc/systemd/system"
#taos-tools para
demoName="taosdemo"
benchmarkName="taosBenchmark"
dumpName="taosdump"
emailName="taosdata.com"
taosName="taos"
toolsName="taostools"
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
......@@ -230,8 +239,20 @@ function install_header() {
# temp install taosBenchmark
function install_taosTools() {
cd ${script_dir}/taos-tools/
tar xvf taosTools-1.4.1-Linux-x64.tar.gz && cd taosTools-1.4.1/ && ./install-taostools.sh
${csudo} rm -f ${bin_link_dir}/${benchmarkName} || :
${csudo} rm -f ${bin_link_dir}/${dumpName} || :
${csudo} rm -f ${bin_link_dir}/rm${toolsName} || :
${csudo} /usr/bin/install -c -m 755 ${script_dir}/bin/${dumpName} ${install_main_dir}/bin/${dumpName}
${csudo} /usr/bin/install -c -m 755 ${script_dir}/bin/${benchmarkName} ${install_main_dir}/bin/${benchmarkName}
${csudo} ln -sf ${install_main_dir}/bin/${benchmarkName} ${install_main_dir}/bin/${demoName}
#Make link
[[ -x ${install_main_dir}/bin/${benchmarkName} ]] && \
${csudo} ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
[[ -x ${install_main_dir}/bin/${demoName} ]] && \
${csudo} ln -s ${install_main_dir}/bin/${demoName} ${bin_link_dir}/${demoName} || :
[[ -x ${install_main_dir}/bin/${dumpName} ]] && \
${csudo} ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
}
function add_newHostname_to_hosts() {
......
......@@ -39,7 +39,7 @@ cd ${compile_dir}
echo "compile_dir: ${compile_dir}"
cmake ..
cmake .. -DBUILD_TOOLS=true
make -j32
release_dir="${top_dir}/release"
......@@ -55,7 +55,6 @@ mkdir -p ${install_dir}
mkdir -p ${install_dir}/bin
mkdir -p ${install_dir}/lib
mkdir -p ${install_dir}/inc
mkdir -p ${install_dir}/taos-tools
install_files="${script_dir}/install.sh"
chmod a+x ${script_dir}/install.sh || :
......@@ -64,13 +63,14 @@ cp ${install_files} ${install_dir}
header_files="${top_dir}/include/client/taos.h ${top_dir}/include/util/taoserror.h"
cp ${header_files} ${install_dir}/inc
bin_files="${compile_dir}/build/bin/taosd ${compile_dir}/build/bin/taos ${compile_dir}/build/bin/create_table ${compile_dir}/build/bin/tmq_sim ${script_dir}/remove.sh"
cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
bin_files="${compile_dir}/source/dnode/mgmt/taosd ${compile_dir}/tools/shell/taos ${compile_dir}/tests/test/c/create_table ${compile_dir}/tests/test/c/tmq_sim ${script_dir}/remove.sh ${compile_dir}/build/bin/taosBenchmark ${compile_dir}/build/bin/taosdump"
cp -rf ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
cp -rf ${compile_dir}/source/client/libtaos.so ${install_dir}/lib/
cp -rf ${compile_dir}/source/libs/tdb/libtdb.so ${install_dir}/lib/
cp -rf ${compile_dir}/build/lib/libavro* ${install_dir}/lib/ > /dev/null || echo -e "failed to copy avro libraries"
cp -rf ${compile_dir}/build/lib/pkgconfig ${install_dir}/lib/ > /dev/null || echo -e "failed to copy pkgconfig directory"
cp ${compile_dir}/build/lib/libtaos.so ${install_dir}/lib/
cp ${compile_dir}/build/lib/libtdb.so ${install_dir}/lib/
taostoolfile="${top_dir}/tools/taosTools-1.4.1-Linux-x64.tar.gz"
cp ${taostoolfile} ${install_dir}/taos-tools
#cp ${compile_dir}/source/dnode/mnode/impl/libmnode.so ${install_dir}/lib/
#cp ${compile_dir}/source/dnode/qnode/libqnode.so ${install_dir}/lib/
......
......@@ -205,7 +205,8 @@ typedef struct SRequestObj {
char* sqlstr; // sql string
int32_t sqlLen;
int64_t self;
char* msgBuf; // error msg buffer
char* msgBuf;
int32_t msgBufLen;
int32_t code;
SArray* dbList;
SArray* tableList;
......@@ -278,7 +279,8 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
......@@ -302,6 +304,8 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq
void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
#ifdef __cplusplus
}
#endif
......
......@@ -19,6 +19,9 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "catalog.h"
typedef void STableDataBlocks;
typedef enum {
STMT_TYPE_INSERT = 1,
......@@ -26,17 +29,64 @@ typedef enum {
STMT_TYPE_QUERY,
} STMT_TYPE;
typedef enum {
STMT_INIT = 1,
STMT_PREPARE,
STMT_SETTBNAME,
STMT_SETTAGS,
STMT_FETCH_TAG_FIELDS,
STMT_FETCH_COL_FIELDS,
STMT_BIND,
STMT_BIND_COL,
STMT_ADD_BATCH,
STMT_EXECUTE,
} STMT_STATUS;
typedef struct SStmtTableCache {
STableDataBlocks* pDataBlock;
void* boundTags;
} SStmtTableCache;
typedef struct SStmtBindInfo {
bool needParse;
uint64_t tbUid;
uint64_t tbSuid;
int32_t sBindRowNum;
int32_t sBindLastIdx;
int8_t tbType;
void* boundTags;
char* tbName;
SName sname;
} SStmtBindInfo;
typedef struct SStmtExecInfo {
SRequestObj* pRequest;
SHashObj* pVgHash;
SHashObj* pBlockHash;
} SStmtExecInfo;
typedef struct SStmtSQLInfo {
STMT_TYPE type;
STMT_STATUS status;
bool autoCreate;
uint64_t runTimes;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
char* sqlStr;
int32_t sqlLen;
} SStmtSQLInfo;
typedef struct STscStmt {
STMT_TYPE type;
//int16_t last;
//STscObj* taos;
//SSqlObj* pSql;
//SMultiTbStmt mtb;
//SNormalStmt normal;
//int numOfRows;
STscObj* taos;
SCatalog* pCatalog;
int32_t affectedRows;
SStmtSQLInfo sql;
SStmtExecInfo exec;
SStmtBindInfo bInfo;
} STscStmt;
#define STMT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
......@@ -44,16 +94,16 @@ typedef struct STscStmt {
TAOS_STMT *stmtInit(TAOS *taos);
int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
char *stmtErrstr(TAOS_STMT *stmt);
const char *stmtErrstr(TAOS_STMT *stmt);
int stmtAffectedRows(TAOS_STMT *stmt);
int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
int stmtGetParamNum(TAOS_STMT *stmt, int *nums);
int stmtAddBatch(TAOS_STMT *stmt);
TAOS_RES *stmtUseResult(TAOS_STMT *stmt);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int32_t colIdx);
#ifdef __cplusplus
......
......@@ -186,6 +186,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
pRequest->pTscObj = pObj;
pRequest->body.fp = fp; // not used it yet
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest);
......
......@@ -146,7 +146,8 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, sizeof((*pRequest)->self))) {
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) {
destroyRequest(*pRequest);
*pRequest = NULL;
tscError("put request to request hash failed");
......@@ -157,7 +158,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS;
}
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
......@@ -170,6 +171,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
......@@ -262,7 +264,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO && precision != TSDB_TIME_PRECISION_NANO) {
if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
precision != TSDB_TIME_PRECISION_NANO) {
return;
}
......@@ -274,7 +277,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
......@@ -298,15 +301,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery);
}
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
if (TSDB_CODE_SUCCESS == code) {
switch (pQuery->execMode) {
......@@ -331,7 +327,10 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
}
taosArrayDestroy(pNodeList);
qDestroyQuery(pQuery);
if (!keepQuery) {
qDestroyQuery(pQuery);
}
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
}
......@@ -339,6 +338,18 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
return pRequest;
}
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = parseSql(pRequest, false, &pQuery, NULL);
}
return launchQueryImpl(pRequest, pQuery, code, false);
}
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SCatalog* pCatalog = NULL;
int32_t code = 0;
......@@ -383,7 +394,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = execQueryImpl(pTscObj, sql, sqlLen);
pRequest = launchQuery(pTscObj, sql, sqlLen);
if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
......@@ -512,6 +523,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = taosMemoryMalloc(contLen);
......@@ -715,6 +728,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
ASSERT(len <= bytes);
ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
varDataSetLen(p, len);
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
......@@ -733,51 +747,55 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
}
pResultInfo->convertBuf[i] = p;
int32_t len = 0;
int32_t len = 0;
SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) {
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart;
char *jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
if(jsonInnerType == TSDB_DATA_TYPE_NULL){
char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if(jsonInnerType == TSDB_DATA_TYPE_JSON){
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
int32_t length =
taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, varDataVal(jsonInnerData));
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0;
}
varDataSetLen(dst, length);
}else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst) + CHAR_BYTES);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
varDataVal(dst) + CHAR_BYTES);
if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, varDataVal(jsonInnerData));
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0;
}
varDataSetLen(dst, length + CHAR_BYTES*2);
varDataSetLen(dst, length + CHAR_BYTES * 2);
*(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"';
}else if(jsonInnerType == TSDB_DATA_TYPE_DOUBLE){
} else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if(jsonInnerType == TSDB_DATA_TYPE_BIGINT){
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if(jsonInnerType == TSDB_DATA_TYPE_BOOL){
sprintf(varDataVal(dst), "%s", (*((char *)jsonInnerData) == 1) ? "true" : "false");
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else {
} else {
ASSERT(0);
}
if(len + varDataTLen(dst) > colLength[i]){
if (len + varDataTLen(dst) > colLength[i]) {
p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -128,6 +128,10 @@ const char *taos_errstr(TAOS_RES *res) {
}
void taos_free_result(TAOS_RES *res) {
if (NULL == res) {
return;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
destroyRequest(pRequest);
......@@ -579,84 +583,111 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
return stmtInit(taos);
}
int taos_stmt_close(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtClose(stmt);
return stmtPrepare(stmt, sql, length);
}
int taos_stmt_execute(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags) {
if (stmt == NULL || name == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtExec(stmt);
int32_t code = stmtSetTbName(stmt, name);
if (code) {
return code;
}
if (tags) {
return stmtSetTbTags(stmt, tags);
}
return TSDB_CODE_SUCCESS;
}
char *taos_stmt_errstr(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
if (stmt == NULL || name == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
return terrno;
}
return stmtErrstr(stmt);
return stmtSetTbName(stmt, name);
}
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return 0;
return terrno;
}
return stmtAffectedRows(stmt);
if (bind->num > 1) {
tscError("invalid bind number %d for %s", bind->num, __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, -1);
}
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) {
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBind(stmt, bind);
if (bind->num <= 0 || bind->num > INT16_MAX) {
tscError("invalid bind num %d", bind->num);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, -1);
}
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
if (stmt == NULL || sql == NULL) {
int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtPrepare(stmt, sql, length);
if (colIdx < 0) {
tscError("invalid bind column idx %d", colIdx);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, colIdx);
}
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
if (stmt == NULL || name == NULL || tags == NULL) {
int taos_stmt_add_batch(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbNameTags(stmt, name, tags);
return stmtAddBatch(stmt);
}
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
if (stmt == NULL || name == NULL) {
int taos_stmt_execute(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbNameTags(stmt, name, NULL);
return stmtExec(stmt);
}
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
......@@ -679,34 +710,38 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return stmtGetParamNum(stmt, nums);
}
int taos_stmt_add_batch(TAOS_STMT *stmt) {
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
return NULL;
}
return stmtAddBatch(stmt);
return stmtUseResult(stmt);
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
char *taos_stmt_errstr(TAOS_STMT *stmt) {
return (char *)stmtErrstr(stmt);
}
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
return 0;
}
return stmtUseResult(stmt);
return stmtAffectedRows(stmt);
}
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
int taos_stmt_close(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind);
return stmtClose(stmt);
}
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
......
......@@ -4,86 +4,539 @@
#include "clientStmt.h"
#include "tdef.h"
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
switch (newStatus) {
case STMT_SETTBNAME:
break;
default:
break;
}
//STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
pStmt->sql.status = newStatus;
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
if (NULL == pStmt->bInfo.tbName) {
tscError("no table name set");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
}
*tbName = pStmt->bInfo.tbName;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = tags;
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->exec.pVgHash = pVgHash;
pStmt->exec.pBlockHash = pBlockHash;
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
*pVgHash = pStmt->exec.pVgHash;
*pBlockHash = pStmt->exec.pBlockHash;
return TSDB_CODE_SUCCESS;
}
int32_t stmtCacheBlock(STscStmt *pStmt) {
if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
return TSDB_CODE_SUCCESS;
}
uint64_t uid;
if (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) {
uid = pStmt->bInfo.tbSuid;
} else {
ASSERT(TSDB_NORMAL_TABLE == pStmt->bInfo.tbType);
uid = pStmt->bInfo.tbUid;
}
if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) {
return TSDB_CODE_SUCCESS;
}
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
STableDataBlocks* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
SStmtTableCache cache = {
.pDataBlock = pDst,
.boundTags = pStmt->bInfo.boundTags,
};
if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pStmt->bInfo.boundTags = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t stmtParseSql(STscStmt* pStmt) {
SStmtCallback stmtCb = {
.pStmt = pStmt,
.getTbNameFn = stmtGetTbName,
.setBindInfoFn = stmtSetBindInfo,
.setExecInfoFn = stmtSetExecInfo,
.getExecInfoFn = stmtGetExecInfo,
};
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
pStmt->bInfo.needParse = false;
switch (nodeType(pStmt->sql.pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->sql.type) {
pStmt->sql.type = STMT_TYPE_INSERT;
}
break;
case QUERY_NODE_SELECT_STMT:
pStmt->sql.type = STMT_TYPE_QUERY;
break;
default:
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
pStmt->bInfo.tbUid = 0;
pStmt->bInfo.tbSuid = 0;
pStmt->bInfo.tbType = 0;
pStmt->bInfo.needParse = true;
taosMemoryFreeClear(pStmt->bInfo.tbName);
destroyBoundColumnInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
uint64_t *key = taosHashGetKey(pIter, NULL);
if (keepTable && (*key == pStmt->bInfo.tbUid)) {
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
continue;
}
qFreeStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
}
if (keepTable) {
return TSDB_CODE_SUCCESS;
}
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
taosMemoryFree(pStmt->sql.sqlStr);
qDestroyQuery(pStmt->sql.pQuery);
void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
while (pIter) {
SStmtTableCache* pCache = (SStmtTableCache*)pIter;
qDestroyStmtDataBlock(pCache->pDataBlock);
destroyBoundColumnInfo(pCache->boundTags);
pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
}
taosHashCleanup(pStmt->sql.pTableCache);
pStmt->sql.pTableCache = NULL;
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
STMT_ERR_RET(stmtCleanExecInfo(pStmt, false));
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.needParse = true;
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pStmt->pCatalog) {
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
}
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
if (pTableMeta->uid == pStmt->bInfo.tbUid) {
pStmt->bInfo.needParse = false;
return TSDB_CODE_SUCCESS;
}
if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (NULL == pCache) {
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
pStmt->bInfo.needParse = false;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
return TSDB_CODE_SUCCESS;
}
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (pCache) {
pStmt->bInfo.needParse = false;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
int32_t stmtResetStmt(STscStmt* pStmt) {
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == pStmt->sql.pTableCache) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
STMT_ERR_RET(terrno);
}
pStmt->sql.status = STMT_INIT;
return TSDB_CODE_SUCCESS;
}
TAOS_STMT *stmtInit(TAOS *taos) {
STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL;
#if 0
pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
if (pStmt == NULL) {
if (NULL == pStmt) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to allocate memory for statement");
return NULL;
}
pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
free(pStmt);
pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == pStmt->sql.pTableCache) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to allocate memory for statement");
taosMemoryFree(pStmt);
return NULL;
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
free(pSql);
free(pStmt);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("failed to malloc payload buffer");
return NULL;
pStmt->taos = pObj;
pStmt->bInfo.needParse = true;
pStmt->sql.status = STMT_INIT;
return pStmt;
}
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtResetStmt(pStmt));
}
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStmt->pSql = pSql;
pStmt->last = STMT_INIT;
pStmt->numOfRows = 0;
registerSqlObj(pSql);
#endif
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
return pStmt;
}
if (length <= 0) {
length = strlen(sql);
}
pStmt->sql.sqlStr = strndup(sql, length);
pStmt->sql.sqlLen = length;
int stmtClose(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
int stmtExec(TAOS_STMT *stmt) {
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(stmtGetFromCache(pStmt));
if (pStmt->bInfo.needParse) {
taosMemoryFree(pStmt->bInfo.tbName);
pStmt->bInfo.tbName = strdup(tbName);
}
return TSDB_CODE_SUCCESS;
}
char *stmtErrstr(TAOS_STMT *stmt) {
return NULL;
}
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, &pStmt->bInfo.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
int stmtAffectedRows(TAOS_STMT *stmt) {
return TSDB_CODE_SUCCESS;
}
int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind) {
int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_TAG_FIELDS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
return TSDB_CODE_SUCCESS;
int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_COL_FIELDS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
return TSDB_CODE_SUCCESS;
}
int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) {
int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int32_t colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (colIdx < 0) {
qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
} else {
if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
tscError("bind column index not in sequence");
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
pStmt->bInfo.sBindLastIdx = colIdx;
if (0 == colIdx) {
pStmt->bInfo.sBindRowNum = bind->num;
}
qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
}
return TSDB_CODE_SUCCESS;
}
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
int stmtAddBatch(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
int stmtExec(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
STMT_ERR_JRET(pStmt->exec.pRequest->code);
pStmt->affectedRows += taos_affected_rows(pStmt->exec.pRequest);
_return:
stmtCleanExecInfo(pStmt, (code ? false : true));
++pStmt->sql.runTimes;
STMT_RET(code);
}
int stmtClose(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_RET(stmtCleanSQLInfo(pStmt));
}
const char *stmtErrstr(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL) {
return (char*) tstrerror(terrno);
}
if (pStmt->exec.pRequest) {
pStmt->exec.pRequest->code = terrno;
}
return taos_errstr(pStmt->exec.pRequest);
}
int stmtAffectedRows(TAOS_STMT *stmt) {
return ((STscStmt*)stmt)->affectedRows;
}
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else {
*insert = isInsertSql(pStmt->sql.sqlStr, 0);
}
return TSDB_CODE_SUCCESS;
}
int stmtAddBatch(TAOS_STMT *stmt) {
int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
return TSDB_CODE_SUCCESS;
}
......@@ -91,9 +544,5 @@ TAOS_RES *stmtUseResult(TAOS_STMT *stmt) {
return NULL;
}
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return TSDB_CODE_SUCCESS;
}
......@@ -24,6 +24,7 @@
#include "tqueue.h"
#include "tref.h"
#if 0
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
......@@ -31,6 +32,7 @@ struct tmq_message_t {
int32_t vgId;
int32_t resIter;
};
#endif
typedef struct {
int8_t tmqRspType;
......@@ -52,9 +54,7 @@ struct tmq_topic_vgroup_t {
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
SArray container; // SArray<tmq_topic_vgroup_t*>
};
struct tmq_conf_t {
......@@ -63,6 +63,7 @@ struct tmq_conf_t {
int8_t autoCommit;
int8_t resetOffset;
uint16_t port;
uint16_t autoCommitInterval;
char* ip;
char* user;
char* pass;
......@@ -202,6 +203,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
......@@ -300,7 +306,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->tmq->commit_cb) {
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL);
}
if (!pParam->async) tsem_post(&pParam->rspSem);
return 0;
......@@ -322,6 +328,7 @@ tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
return tmq_subscribe(tmq, lst);
}
#if 0
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
if (pTmq == NULL) {
......@@ -357,8 +364,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq;
}
#endif
tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
......@@ -369,6 +377,7 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
ASSERT(user);
ASSERT(pass);
ASSERT(conf->db);
ASSERT(conf->groupId[0]);
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) return NULL;
......@@ -429,8 +438,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.num = pArray->size;
req.offsets = pArray->pData;
} else {
req.num = offsets->cnt;
req.offsets = (SMqOffset*)offsets->elems;
req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->container.pData;
}
SCoder encoder;
......@@ -615,7 +624,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
int32_t code = 0;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
// todo check for invalid sql statement and return with error code
......@@ -1538,16 +1547,6 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
}
#endif
#if 0
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
SMqPollRsp* pRsp = &tmq_message->msg;
tDeleteSMqConsumeRsp(pRsp);
/*taosMemoryFree(tmq_message);*/
taosFreeQitem(tmq_message);
}
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
const char* tmq_err2str(tmq_resp_err_t err) {
......
......@@ -108,7 +108,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
taos_free_result(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
......@@ -141,7 +141,7 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
taos_free_result(msg);
//printf("get msg\n");
}
}
......
......@@ -2756,6 +2756,8 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->passwd) < 0) return -1;
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
tEndEncode(&encoder);
......@@ -2773,6 +2775,8 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->passwd) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1;
tEndDecode(&decoder);
......@@ -3032,7 +3036,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
return 0;
}
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -3052,7 +3055,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -3085,7 +3088,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
return 0;
}
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
......@@ -519,8 +519,9 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
SKvRowIdx *pIdx = tdKvRowColIdxAt(pRow, rcol);
int16_t colIdx = -1;
if (pIdx) {
colIdx = POINTER_DISTANCE(pRow->data, pIdx) / sizeof(SKvRowIdx);
colIdx = POINTER_DISTANCE(pIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx);
}
TASSERT(colIdx >= 0);
SCellVal sVal = {0};
if (pIdx->colId == pDataCol->colId) {
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) {
......
......@@ -119,10 +119,10 @@ _OVER:
}
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
SDnodeTrans * pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u;
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
if (msgType == TDMT_DND_SERVER_STATUS) {
......@@ -443,7 +443,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen);
void * pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......@@ -523,4 +523,4 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.pWrapper = pWrapper,
};
return msgCb;
}
\ No newline at end of file
}
......@@ -29,6 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen);
#ifdef __cplusplus
}
#endif
......
......@@ -24,20 +24,20 @@
#include "version.h"
typedef struct {
uint32_t id;
int8_t connType;
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time
int32_t pid; // pid of app that invokes taosc
uint32_t ip;
uint16_t port;
int8_t killed;
int64_t loginTimeMs;
int64_t lastAccessTimeMs;
uint64_t killId;
int32_t numOfQueries;
SArray *pQueries; //SArray<SQueryDesc>
uint32_t id;
int8_t connType;
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time
int32_t pid; // pid of app that invokes taosc
uint32_t ip;
uint16_t port;
int8_t killed;
int64_t loginTimeMs;
int64_t lastAccessTimeMs;
uint64_t killId;
int32_t numOfQueries;
SArray * pQueries; // SArray<SQueryDesc>
} SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
......@@ -45,7 +45,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
static void * mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq);
static int32_t mndProcessConnectReq(SNodeMsg *pReq);
......@@ -71,9 +71,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
return 0;
......@@ -91,7 +91,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
char connStr[255] = {0};
char connStr[255] = {0};
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
int32_t connId = mndGenerateUid(connStr, len);
if (startTime == 0) startTime = taosGetTimestampMs();
......@@ -174,10 +174,10 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
}
static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SConnObj *pConn = NULL;
SMnode * pMnode = pReq->pNode;
SUserObj * pUser = NULL;
SDbObj * pDb = NULL;
SConnObj * pConn = NULL;
int32_t code = -1;
SConnectReq connReq = {0};
char ip[30] = {0};
......@@ -194,6 +194,11 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
goto CONN_OVER;
}
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
mError("user:%s, failed to auth while acquire user\n %s \r\n %s", pReq->user, connReq.passwd, pUser->pass);
code = TSDB_CODE_RPC_AUTH_FAILURE;
goto CONN_OVER;
}
if (connReq.db[0]) {
char db[TSDB_DB_FNAME_LEN];
......@@ -253,8 +258,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->pQueries = pBasic->queryDesc;
pBasic->queryDesc = NULL;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
return TSDB_CODE_SUCCESS;
}
......@@ -324,9 +329,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
return NULL;
}
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) {
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query;
......@@ -335,8 +341,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
rpcGetConnInfo(pMsg->handle, &connInfo);
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
pBasic->pid, pBasic->app, 0);
if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1;
......@@ -345,7 +352,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
} else if (pConn->killed) {
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
mndReleaseConn(pMnode, pConn);
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1;
}
......@@ -369,8 +376,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
rspBasic->connId = pConn->id;
rspBasic->totalDnodes = 1; //TODO
rspBasic->onlineDnodes = 1; //TODO
rspBasic->totalDnodes = 1; // TODO
rspBasic->onlineDnodes = 1; // TODO
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndReleaseConn(pMnode, pConn);
......@@ -379,7 +386,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) {
taosArrayPush(pBatchRsp->rsps, &hbRsp);
taosArrayPush(pBatchRsp->rsps, &hbRsp);
return TSDB_CODE_SUCCESS;
}
......@@ -396,7 +403,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL;
void * rspMsg = NULL;
int32_t rspLen = 0;
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
......@@ -406,7 +413,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
break;
}
case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL;
void * rspMsg = NULL;
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
......@@ -457,7 +464,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
void *buf = rpcMallocCont(tlen);
void * buf = rpcMallocCont(tlen);
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
......@@ -479,7 +486,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
}
static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SMnode * pMnode = pReq->pNode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
......@@ -513,7 +520,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
}
static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SMnode * pMnode = pReq->pNode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
......@@ -545,11 +552,11 @@ static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
}
static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SMnode * pMnode = pReq->pNode;
int32_t numOfRows = 0;
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
char * pWrite;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
if (pShow->pIter == NULL) {
......@@ -604,8 +611,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int
}
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
int32_t numOfRows = 0;
SMnode *pMnode = pReq->pNode;
int32_t numOfRows = 0;
#if 0
SConnObj *pConn = NULL;
int32_t cols = 0;
......@@ -703,7 +710,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i
}
pShow->numOfRows += numOfRows;
#endif
#endif
return numOfRows;
}
......
......@@ -34,6 +34,54 @@
extern bool tsStreamSchedV;
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) {
SNode* pAst = NULL;
SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS;
if (nodesStringToNode(ast, &pAst) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.rSmaQuery = true,
.triggerType = triggerType,
.watermark = watermark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (qSubPlanToString(plan, pStr, pLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
END:
if (pAst) nodesDestroyNode(pAst);
if (pPlan) nodesDestroyNode(pPlan);
return terrno;
}
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
......
......@@ -30,8 +30,15 @@ int32_t MndTestProfile::connId;
TEST_F(MndTestProfile, 01_ConnectMsg) {
SConnectReq connectReq = {0};
connectReq.pid = 1234;
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen);
......@@ -58,10 +65,16 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
}
TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen);
......
......@@ -54,10 +54,16 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
}
TEST_F(MndTestShow, 03_ShowMsg_Conn) {
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_show");
strcpy(connectReq.db, "");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen);
......@@ -74,4 +80,4 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
TEST_F(MndTestShow, 04_ShowMsg_Cluster) {
test.SendShowReq(TSDB_MGMT_TABLE_CLUSTER, "cluster", "");
EXPECT_EQ(test.GetShowRows(), 1);
}
\ No newline at end of file
}
......@@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows);
// need to reposition
......
......@@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false;
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
// currently only rows are used
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return 0;
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t sversion = 0;
......@@ -112,7 +103,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows;
*pNumOfRows = pHandle->pBlock->numOfRows;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
......@@ -120,10 +111,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNumNeed = pSchemaWrapper->nCols;
}
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) {
return NULL;
*ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (*ppCols == NULL) {
return -1;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
......@@ -136,21 +128,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNeed++;
} else {
SColumnInfoData colInfo = {0};
/*int sz = numOfRows * pColSchema->bytes;*/
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
goto FAIL;
}
taosArrayPush(pArray, &colInfo);
taosArrayPush(*ppCols, &colInfo);
colMeta++;
colNeed++;
}
}
int32_t colActual = taosArrayGetSize(*ppCols);
// TODO in stream shuffle case, fetch groupId
*pGroupId = 0;
STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
......@@ -159,22 +154,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
int32_t colTot = taosArrayGetSize(pArray);
for (int32_t i = 0; i < colTot; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
for (int32_t i = 0; i < colActual; i++) {
SColumnInfoData* pColData = taosArrayGet(*ppCols, i);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
/*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
goto FAIL;
}
}
curRow++;
}
return pArray;
return 0;
FAIL:
taosArrayDestroy(*ppCols);
return -1;
}
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
......
......@@ -89,7 +89,7 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
}
static FORCE_INLINE void getSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
int32_t *toffset, col_id_t *colIdx) {
col_id_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) {
......@@ -131,7 +131,6 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
void destroyBlockArrayList(SArray* pDataBlockList);
void destroyBlockHashmap(SHashObj* pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
......@@ -139,5 +138,7 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq);
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
#endif // TDENGINE_DATABLOCKMGT_H
......@@ -21,6 +21,8 @@ extern "C" {
#endif
#include "parser.h"
#include "parToken.h"
#include "parUtil.h"
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
......
此差异已折叠。
......@@ -18,6 +18,7 @@
#include "catalog.h"
#include "parUtil.h"
#include "querynodes.h"
#include "parInt.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
......@@ -102,7 +103,13 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) {
}
}
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
void destroyBoundColumnInfo(void* pBoundInfo) {
if (NULL == pBoundInfo) {
return;
}
SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo;
taosMemoryFreeClear(pColList->boundColumns);
taosMemoryFreeClear(pColList->cols);
taosMemoryFreeClear(pColList->colIdxInfo);
......@@ -149,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
if (pBlocks->nAllocSize - pBlocks->size < len) {
pBlocks->nAllocSize += len + pBlocks->rowSize;
......@@ -506,6 +513,28 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
// expand the allocated size
if (remain < allSize) {
pDataBlock->nAllocSize = (pDataBlock->size + allSize) * 1.5;
char *tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
if (tmp != NULL) {
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
// do nothing, if allocate more memory failed
pDataBlock->nAllocSize = nAllocSizeOld;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
......@@ -541,3 +570,84 @@ int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo
pColInfo->boundNullLen);
return TSDB_CODE_SUCCESS;
}
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
STableDataBlocks* pBlock = (STableDataBlocks*)block;
if (keepBuf) {
taosMemoryFreeClear(pBlock->pData);
pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
if (NULL == pBlock->pData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
} else {
pBlock->pData = NULL;
}
pBlock->ordered = true;
pBlock->prevTS = INT64_MIN;
pBlock->size = sizeof(SSubmitBlk);
pBlock->tsSource = -1;
pBlock->numOfTables = 1;
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
pBlock->headerSize = pBlock->size;
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
return TSDB_CODE_SUCCESS;
}
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
*pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
if (NULL == *pDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
((STableDataBlocks*)(*pDst))->cloned = true;
return qResetStmtDataBlock(*pDst, false);
}
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
if (code) {
return code;
}
STableDataBlocks *pBlock = (STableDataBlocks*)*pDst;
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
if (NULL == pBlock->pData) {
qFreeStmtDataBlock(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
return TSDB_CODE_SUCCESS;
}
void qFreeStmtDataBlock(void* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
taosMemoryFreeClear(pDataBlock);
}
void qDestroyStmtDataBlock(void* pBlock) {
if (pBlock == NULL) {
return;
}
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
pDataBlock->cloned = false;
destroyDataBlock(pDataBlock);
}
......@@ -18,7 +18,11 @@
#include "parInt.h"
#include "parToken.h"
static bool isInsertSql(const char* pStr, size_t length) {
bool isInsertSql(const char* pStr, size_t length) {
if (NULL == pStr) {
return false;
}
int32_t index = 0;
do {
......@@ -68,4 +72,4 @@ void qDestroyQuery(SQuery* pQueryNode) {
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
return extractResultSchema(pRoot, numOfCols, pSchema);
}
\ No newline at end of file
}
......@@ -716,7 +716,8 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), varDataVal(input));
int32_t len = MIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE);
len = sprintf(varDataVal(output), "%.*s", len, varDataVal(input));
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
//not support
......
......@@ -158,6 +158,7 @@ typedef struct {
char secured : 2;
char spi : 2;
char user[TSDB_UNI_LEN];
uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later
uint32_t msgType;
......@@ -186,23 +187,23 @@ typedef enum { Normal, Quit, Release, Register } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
......
......@@ -614,35 +614,16 @@ void cliSend(SCliConn* pConn) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
int msgLen = transMsgLenFromCont(pMsg->contLen);
if (!pConn->secured) {
char* buf = taosMemoryCalloc(1, msgLen + sizeof(STransUserMsg));
memcpy(buf, (char*)pHead, msgLen);
STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
memcpy(uMsg->user, pTransInst->user, tListLen(uMsg->user));
memcpy(uMsg->secret, pTransInst->secret, tListLen(uMsg->secret));
// to avoid mem leak
destroyUserdata(pMsg);
pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);
pHead = (STransMsgHead*)buf;
pHead->secured = 1;
msgLen += sizeof(STransUserMsg);
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
......
......@@ -46,7 +46,6 @@ typedef struct SSrvConn {
struct sockaddr_in addr;
struct sockaddr_in locaddr;
char secured;
int spi;
char info[64];
char user[TSDB_UNI_LEN]; // user ID for the link
......@@ -104,6 +103,13 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status);
/*
* time-consuming task throwed into BG work thread
*/
static void uvWorkDoTask(uv_work_t* req);
static void uvWorkAfterTask(uv_work_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
......@@ -181,16 +187,16 @@ static void uvHandleReq(SSrvConn* pConn) {
uint32_t msgLen = pBuf->len;
STransMsgHead* pHead = (STransMsgHead*)msg;
if (pHead->secured == 1) {
STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
if (pHead->secured == 1) {
pHead->msgLen -= sizeof(STransUserMsg);
}
memcpy(pConn->user, pHead->user, strlen(pHead->user));
// TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
CONN_SHOULD_RELEASE(pConn, pHead);
......@@ -344,12 +350,6 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->ahandle;
// pHead->secured = pMsg->code == 0 ? 1 : 0; //
if (!pConn->secured) {
pConn->secured = pMsg->code == 0 ? 1 : 0;
}
pHead->secured = pConn->secured;
if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1;
} else {
......@@ -464,6 +464,24 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
taosMemoryFree(req);
}
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
// only auth conn currently, add more func later
tTrace("server conn %p start to be processed in BG Thread", req->data);
return;
}
static void uvWorkAfterTask(uv_work_t* req, int status) {
if (status != 0) {
tTrace("server conn %p failed to processed ", req->data);
}
// Done time-consumeing task
// add more func later
// this func called in main loop
tTrace("server conn %p already processed ", req->data);
taosMemoryFree(req);
}
void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) {
return;
......
......@@ -138,6 +138,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
// mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
......
此差异已折叠。
......@@ -4,18 +4,14 @@
ROOT=./
TARGET=exe
LFLAGS = '-Wl,-rpath,/usr/local/taos/driver/' -ltaos -lpthread -lm -lrt
CFLAGS = -O0 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
CFLAGS = -O0 -g -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \
-Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX \
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99 \
-fsanitize=address
-Wno-unused-function -D_M_X64 -I/usr/local/taos/include -std=gnu99 -Wno-sign-conversion
all: $(TARGET)
exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS)
gcc $(CFLAGS) ./stmtTest.c -o $(ROOT)stmtTest $(LFLAGS)
gcc $(CFLAGS) ./stmt_function.c -o $(ROOT)stmt_function $(LFLAGS)
clean:
rm $(ROOT)batchprepare
......
!/bin/bash
#!/bin/bash
##################################################
#
......
此差异已折叠。
import taos
import sys
from util.log import *
from util.sql import *
from util.cases import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
tdSql.execute("create stable db.stb1 (ts timestamp, c1 int, c2 int) tags(t0 tinyint, t1 int, t2 int)")
tdSql.execute("create stable db.stb2 (ts timestamp, c2 int, c3 binary(16)) tags(t2 binary(16), t3 binary(16), t4 int)")
maxRemainderNum=7
tbnum=101
for i in range(tbnum-1):
sql = f"create table db.t{i} using db.stb1 tags({i%maxRemainderNum}, {(i-1)%maxRemainderNum}, {i%2})"
tdSql.execute(sql)
tdSql.execute(f"insert into db.t{i} values (now-10d, {i}, {i%3})")
tdSql.execute(f"insert into db.t{i} values (now-9d, {i}, {(i-1)%3})")
tdSql.execute(f"insert into db.t{i} values (now-8d, {i}, {(i-2)%3})")
tdSql.execute(f"insert into db.t{i} (ts )values (now-7d)")
tdSql.execute(f"create table db.t0{i} using db.stb2 tags('{i%maxRemainderNum}', '{(i-1)%maxRemainderNum}', {i%3})")
tdSql.execute(f"insert into db.t0{i} values (now-10d, {i}, '{(i+1)%3}')")
tdSql.execute(f"insert into db.t0{i} values (now-9d, {i}, '{(i+2)%3}')")
tdSql.execute(f"insert into db.t0{i} values (now-8d, {i}, '{(i)%3}')")
tdSql.execute(f"insert into db.t0{i} (ts )values (now-7d)")
# tdSql.execute("create table db.t100num using db.stb1 tags(null, null, null)")
# tdSql.execute("create table db.t0100num using db.stb2 tags(null, null, null)")
# tdSql.execute(f"insert into db.t100num values (now-10d, {tbnum-1}, 1)")
# tdSql.execute(f"insert into db.t100num values (now-9d, {tbnum-1}, 0)")
# tdSql.execute(f"insert into db.t100num values (now-8d, {tbnum-1}, 2)")
# tdSql.execute(f"insert into db.t100num (ts )values (now-7d)")
# tdSql.execute(f"insert into db.t0100num values (now-10d, {tbnum-1}, 1)")
# tdSql.execute(f"insert into db.t0100num values (now-9d, {tbnum-1}, 0)")
# tdSql.execute(f"insert into db.t0100num values (now-8d, {tbnum-1}, 2)")
# tdSql.execute(f"insert into db.t0100num (ts )values (now-7d)")
#========== distinct multi-data-coloumn ==========
# tdSql.query(f"select distinct c1 from stb1 where c1 <{tbnum}")
# tdSql.checkRows(tbnum)
# tdSql.query(f"select distinct c2 from stb1")
# tdSql.checkRows(4)
# tdSql.query(f"select distinct c1,c2 from stb1 where c1 <{tbnum}")
# tdSql.checkRows(tbnum*3)
# tdSql.query(f"select distinct c1,c1 from stb1 where c1 <{tbnum}")
# tdSql.checkRows(tbnum)
# tdSql.query(f"select distinct c1,c2 from stb1 where c1 <{tbnum} limit 3")
# tdSql.checkRows(3)
# tdSql.query(f"select distinct c1,c2 from stb1 where c1 <{tbnum} limit 3 offset {tbnum*3-2}")
# tdSql.checkRows(2)
tdSql.query(f"select distinct c1 from t1 where c1 <{tbnum}")
tdSql.checkRows(1)
tdSql.query(f"select distinct c2 from t1")
tdSql.checkRows(4)
tdSql.query(f"select distinct c1,c2 from t1 where c1 <{tbnum}")
tdSql.checkRows(3)
tdSql.query(f"select distinct c1,c1 from t1 ")
tdSql.checkRows(2)
tdSql.query(f"select distinct c1,c1 from t1 where c1 <{tbnum}")
tdSql.checkRows(1)
tdSql.query(f"select distinct c1,c2 from t1 where c1 <{tbnum} limit 3")
tdSql.checkRows(3)
tdSql.query(f"select distinct c1,c2 from t1 where c1 <{tbnum} limit 3 offset 2")
tdSql.checkRows(1)
# tdSql.query(f"select distinct c3 from stb2 where c2 <{tbnum} ")
# tdSql.checkRows(3)
# tdSql.query(f"select distinct c3, c2 from stb2 where c2 <{tbnum} limit 2")
# tdSql.checkRows(2)
# tdSql.error("select distinct c5 from stb1")
tdSql.error("select distinct c5 from t1")
tdSql.error("select distinct c1 from db.*")
tdSql.error("select c2, distinct c1 from stb1")
tdSql.error("select c2, distinct c1 from t1")
tdSql.error("select distinct c2 from ")
tdSql.error("distinct c2 from stb1")
tdSql.error("distinct c2 from t1")
tdSql.error("select distinct c1, c2, c3 from stb1")
tdSql.error("select distinct c1, c2, c3 from t1")
tdSql.error("select distinct stb1.c1, stb1.c2, stb2.c2, stb2.c3 from stb1")
tdSql.error("select distinct stb1.c1, stb1.c2, stb2.c2, stb2.c3 from t1")
tdSql.error("select distinct t1.c1, t1.c2, t2.c1, t2.c2 from t1")
# tdSql.query(f"select distinct c1 c2, c2 c3 from stb1 where c1 <{tbnum}")
# tdSql.checkRows(tbnum*3)
tdSql.query(f"select distinct c1 c2, c2 c3 from t1 where c1 <{tbnum}")
tdSql.checkRows(3)
# tdSql.error("select distinct c1, c2 from stb1 order by ts")
tdSql.error("select distinct c1, c2 from t1 order by ts")
# tdSql.error("select distinct c1, ts from stb1 group by c2")
tdSql.error("select distinct c1, ts from t1 group by c2")
# tdSql.error("select distinct c1, max(c2) from stb1 ")
tdSql.error("select distinct c1, max(c2) from t1 ")
# tdSql.error("select max(c2), distinct c1 from stb1 ")
tdSql.error("select max(c2), distinct c1 from t1 ")
# tdSql.error("select distinct c1, c2 from stb1 where c1 > 3 group by t0")
tdSql.error("select distinct c1, c2 from t1 where c1 > 3 group by t0")
# tdSql.error("select distinct c1, c2 from stb1 where c1 > 3 interval(1d) ")
tdSql.error("select distinct c1, c2 from t1 where c1 > 3 interval(1d) ")
# tdSql.error("select distinct c1, c2 from stb1 where c1 > 3 interval(1d) fill(next)")
tdSql.error("select distinct c1, c2 from t1 where c1 > 3 interval(1d) fill(next)")
# tdSql.error("select distinct c1, c2 from stb1 where ts > now-10d and ts < now interval(1d) fill(next)")
tdSql.error("select distinct c1, c2 from t1 where ts > now-10d and ts < now interval(1d) fill(next)")
# tdSql.error("select distinct c1, c2 from stb1 where c1 > 3 slimit 1")
# tdSql.error("select distinct c1, c2 from t1 where c1 > 3 slimit 1")
# tdSql.query(f"select distinct c1, c2 from stb1 where c1 between {tbnum-2} and {tbnum} ")
# tdSql.checkRows(6)
tdSql.query(f"select distinct c1, c2 from t1 where c1 between {tbnum-2} and {tbnum} ")
# tdSql.checkRows(1)
# tdSql.query("select distinct c1, c2 from stb1 where c1 in (1,2,3,4,5)")
# tdSql.checkRows(15)
tdSql.query("select distinct c1, c2 from t1 where c1 in (1,2,3,4,5)")
# tdSql.checkRows(1)
# tdSql.query("select distinct c1, c2 from stb1 where c1 in (100,1000,10000)")
# tdSql.checkRows(3)
tdSql.query("select distinct c1, c2 from t1 where c1 in (100,1000,10000)")
# tdSql.checkRows(0)
# tdSql.query(f"select distinct c1,c2 from (select * from stb1 where c1 > {tbnum-2}) ")
# tdSql.checkRows(3)
# tdSql.query(f"select distinct c1,c2 from (select * from t1 where c1 < {tbnum}) ")
# tdSql.checkRows(3)
# tdSql.query(f"select distinct c1,c2 from (select * from stb1 where t2 !=0 and t2 != 1) ")
# tdSql.checkRows(0)
# tdSql.error("select distinct c1, c2 from (select distinct c1, c2 from stb1 where t0 > 2 and t1 < 3) ")
# tdSql.error("select c1, c2 from (select distinct c1, c2 from stb1 where t0 > 2 and t1 < 3) ")
# tdSql.query("select distinct c1, c2 from (select c2, c1 from stb1 where c1 > 2 ) where c1 < 4")
# tdSql.checkRows(3)
# tdSql.error("select distinct c1, c2 from (select c1 from stb1 where t0 > 2 ) where t1 < 3")
# tdSql.error("select distinct c1, c2 from (select c2, c1 from stb1 where c1 > 2 order by ts)")
# tdSql.error("select distinct c1, c2 from (select c2, c1 from t1 where c1 > 2 order by ts)")
# tdSql.error("select distinct c1, c2 from (select c2, c1 from stb1 where c1 > 2 group by c1)")
# tdSql.error("select distinct c1, c2 from (select max(c1) c1, max(c2) c2 from stb1 group by c1)")
# tdSql.error("select distinct c1, c2 from (select max(c1) c1, max(c2) c2 from t1 group by c1)")
# tdSql.query("select distinct c1, c2 from (select max(c1) c1, max(c2) c2 from stb1 )")
# tdSql.checkRows(1)
# tdSql.query("select distinct c1, c2 from (select max(c1) c1, max(c2) c2 from t1 )")
# tdSql.checkRows(1)
# tdSql.error("select distinct stb1.c1, stb1.c2 from stb1 , stb2 where stb1.ts=stb2.ts and stb1.t2=stb2.t4")
# tdSql.error("select distinct t1.c1, t1.c2 from t1 , t2 where t1.ts=t2.ts ")
# tdSql.error("select distinct c1, c2 from (select count(c1) c1, count(c2) c2 from stb1 group by ts)")
# tdSql.error("select distinct c1, c2 from (select count(c1) c1, count(c2) c2 from t1 group by ts)")
# #========== suport distinct multi-tags-coloumn ==========
# tdSql.query("select distinct t1 from stb1")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t0, t1 from stb1")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t1, t0 from stb1")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t1, t2 from stb1")
# tdSql.checkRows(maxRemainderNum*2+1)
# tdSql.query("select distinct t0, t1, t2 from stb1")
# tdSql.checkRows(maxRemainderNum*2+1)
# tdSql.query("select distinct t0 t1, t1 t2 from stb1")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t0, t0, t0 from stb1")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t0, t1 from t1")
# tdSql.checkRows(1)
# tdSql.query("select distinct t0, t1 from t100num")
# tdSql.checkRows(1)
# tdSql.query("select distinct t3 from stb2")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t2, t3 from stb2")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t3, t2 from stb2")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t4, t2 from stb2")
# tdSql.checkRows(maxRemainderNum*3+1)
# tdSql.query("select distinct t2, t3, t4 from stb2")
# tdSql.checkRows(maxRemainderNum*3+1)
# tdSql.query("select distinct t2 t1, t3 t2 from stb2")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t3, t3, t3 from stb2")
# tdSql.checkRows(maxRemainderNum+1)
# tdSql.query("select distinct t2, t3 from t01")
# tdSql.checkRows(1)
# tdSql.query("select distinct t3, t4 from t0100num")
# tdSql.checkRows(1)
# ########## should be error #########
# tdSql.error("select distinct from stb1")
# tdSql.error("select distinct t3 from stb1")
# tdSql.error("select distinct t1 from db.*")
# tdSql.error("select distinct t2 from ")
# tdSql.error("distinct t2 from stb1")
# tdSql.error("select distinct stb1")
# tdSql.error("select distinct t0, t1, t2, t3 from stb1")
# tdSql.error("select distinct stb1.t0, stb1.t1, stb2.t2, stb2.t3 from stb1")
# tdSql.error("select dist t0 from stb1")
# tdSql.error("select distinct stb2.t2, stb2.t3 from stb1")
# tdSql.error("select distinct stb2.t2 t1, stb2.t3 t2 from stb1")
# tdSql.error("select distinct t0, t1 from t1 where t0 < 7")
# ########## add where condition ##########
# tdSql.query("select distinct t0, t1 from stb1 where t1 > 3")
# tdSql.checkRows(3)
# tdSql.query("select distinct t0, t1 from stb1 where t1 > 3 limit 2")
# tdSql.checkRows(2)
# tdSql.query("select distinct t0, t1 from stb1 where t1 > 3 limit 2 offset 2")
# tdSql.checkRows(1)
# tdSql.query("select distinct t0, t1 from stb1 where t1 > 3 slimit 2")
# tdSql.checkRows(3)
# tdSql.error("select distinct t0, t1 from stb1 where c1 > 2")
# tdSql.query("select distinct t0, t1 from stb1 where t1 > 3 and t1 < 5")
# tdSql.checkRows(1)
# tdSql.error("select distinct stb1.t0, stb1.t1 from stb1, stb2 where stb1.t2=stb2.t4")
# tdSql.error("select distinct t0, t1 from stb1 where stb2.t4 > 2")
# tdSql.error("select distinct t0, t1 from stb1 where t1 > 3 group by t0")
# tdSql.error("select distinct t0, t1 from stb1 where t1 > 3 interval(1d) ")
# tdSql.error("select distinct t0, t1 from stb1 where t1 > 3 interval(1d) fill(next)")
# tdSql.error("select distinct t0, t1 from stb1 where ts > now-10d and ts < now interval(1d) fill(next)")
# tdSql.error("select max(c1), distinct t0 from stb1 where t0 > 2")
# tdSql.error("select distinct t0, max(c1) from stb1 where t0 > 2")
# tdSql.error("select distinct t0 from stb1 where t0 in (select t0 from stb1 where t0 > 2)")
# tdSql.query("select distinct t0, t1 from stb1 where t0 in (1,2,3,4,5)")
# tdSql.checkRows(5)
# tdSql.query("select distinct t1 from (select t0, t1 from stb1 where t0 > 2) ")
# tdSql.checkRows(4)
# tdSql.error("select distinct t1 from (select distinct t0, t1 from stb1 where t0 > 2 and t1 < 3) ")
# tdSql.error("select distinct t1 from (select distinct t0, t1 from stb1 where t0 > 2 ) where t1 < 3")
# tdSql.query("select distinct t1 from (select t0, t1 from stb1 where t0 > 2 ) where t1 < 3")
# tdSql.checkRows(1)
# tdSql.error("select distinct t1, t0 from (select t1 from stb1 where t0 > 2 ) where t1 < 3")
# tdSql.error("select distinct t1, t0 from (select max(t1) t1, max(t0) t0 from stb1 group by t1)")
# tdSql.error("select distinct t1, t0 from (select max(t1) t1, max(t0) t0 from stb1)")
# tdSql.query("select distinct t1, t0 from (select t1,t0 from stb1 where t0 > 2 ) where t1 < 3")
# tdSql.checkRows(1)
# tdSql.error(" select distinct t1, t0 from (select t1,t0 from stb1 where t0 > 2 order by ts) where t1 < 3")
# tdSql.error("select t1, t0 from (select distinct t1,t0 from stb1 where t0 > 2 ) where t1 < 3")
# tdSql.error(" select distinct t1, t0 from (select t1,t0 from stb1 where t0 > 2 group by ts) where t1 < 3")
# tdSql.error("select distinct stb1.t1, stb1.t2 from stb1 , stb2 where stb1.ts=stb2.ts and stb1.t2=stb2.t4")
# tdSql.error("select distinct t1.t1, t1.t2 from t1 , t2 where t1.ts=t2.ts ")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py
python3 ./test.py -f 2-query/varchar.py
......@@ -340,7 +340,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
......@@ -367,7 +367,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) {
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
......@@ -400,7 +400,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqmessage);*/
}
tmq_message_destroy(tmqmessage);
taos_free_result(tmqmessage);
} else {
break;
}
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册