提交 e70de288 编写于 作者: S Shuaiqiang Chang

Merge branch 'develop' into hotfix/taos-tools

* develop: (138 commits)
  [td-225]fix error in global variables.
  [td-225]fix error in global variables.
  [td-225] fix invalid read
  [td-688]
  [td-225] update the error code info
  [td-225] update the error code info
  [td-225] update the error code when system related error happens.
  Fix bug occured in regression test
  fix compile error in arm64
  [TD-543] fix coverity scans
  reference count
  Bump jackson-databind in /tests/comparisonTest/opentsdb/opentsdbtest
  [TD-543] coverity scan
  [td-225] add cancel query support, fix bugs in load file blocks when column has already been updated.
  implement format check for log
  change SIGINT to SIGKILL to make dnode5 be killed.
  scripts
  add comments
  free ID in the last step
  dont reset lockedBy
  ...
......@@ -260,19 +260,75 @@ matrix:
- cmake .. > /dev/null
- make > /dev/null
# - os: osx
# language: c
# compiler: clang
# env: DESC="mac/clang build"
# git:
# - depth: 1
# addons:
# homebrew:
# - cmake
#
# script:
# - cd ${TRAVIS_BUILD_DIR}
# - mkdir debug
# - cd debug
# - cmake .. > /dev/null
# - make > /dev/null
- os: linux
arch: arm64
dist: bionic
language: c
compiler: clang
env: DESC="linux/clang build"
git:
- depth: 1
addons:
apt:
packages:
- build-essential
- cmake
before_script:
- cd ${TRAVIS_BUILD_DIR}
- mkdir debug
- cd debug
script:
- if [ "${TRAVIS_CPU_ARCH}" == "arm64" ]; then
cmake .. -DCPUTYPE=aarch64 > /dev/null;
else
cmake .. > /dev/null;
fi
- make > /dev/null
- os: linux
arch: arm64
dist: trusty
language: c
git:
- depth: 1
addons:
apt:
packages:
- build-essential
- cmake
env:
- DESC="trusty/gcc-4.8 build"
before_script:
- cd ${TRAVIS_BUILD_DIR}
- mkdir debug
- cd debug
script:
- if [ "${TRAVIS_CPU_ARCH}" == "arm64" ]; then
cmake .. -DCPUTYPE=aarch64 > /dev/null;
else
cmake .. > /dev/null;
fi
- make > /dev/null
# - os: osx
# language: c
# compiler: clang
# env: DESC="mac/clang build"
# git:
# - depth: 1
# addons:
# homebrew:
# - cmake
#
# script:
# - cd ${TRAVIS_BUILD_DIR}
# - mkdir debug
# - cd debug
# - cmake .. > /dev/null
# - make > /dev/null
......@@ -53,7 +53,7 @@ STable从属于库,一个STable只属于一个库,但一个库可以有一
说明:
1. TAGS列总长度不能超过64k bytes;
1. TAGS列总长度不能超过16k bytes;
2. TAGS列的数据类型不能是timestamp;
3. TAGS列名不能与其他列名相同;
4. TAGS列名不能为预留关键字.
......
......@@ -22,7 +22,7 @@ New keyword "tags" is introduced, where tag_name is the tag name, and tag_type i
Note:
1. The bytes of all tags together shall be less than 64k
1. The bytes of all tags together shall be less than 16k
2. Tag's data type can not be time stamp
3. Tag name shall be different from the field name
4. Tag name shall not be the same as system keywords
......
......@@ -63,7 +63,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
| 3 | BIGINT | 8 | 长整型,范围 [-2^63+1, 2^63-1], -2^63用于NULL |
| 4 | FLOAT | 4 | 浮点型,有效位数6-7,范围 [-3.4E38, 3.4E38] |
| 5 | DOUBLE | 8 | 双精度浮点型,有效位数15-16,范围 [-1.7E308, 1.7E308] |
| 6 | BINARY | 自定义 | 用于记录字符串,理论上,最长可以有65526字节,但由于每行数据最多64K字节,实际上限一般小于理论值。 binary仅支持字符串输入,字符串两端使用单引号引用,否则英文全部自动转化为小写。使用时须指定大小,如binary(20)定义了最长为20个字符的字符串,每个字符占1byte的存储空间。如果用户字符串超出20字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示, 即 **\’**。 |
| 6 | BINARY | 自定义 | 用于记录字符串,理论上,最长可以有16374字节,但由于每行数据最多16K字节,实际上限一般小于理论值。 binary仅支持字符串输入,字符串两端使用单引号引用,否则英文全部自动转化为小写。使用时须指定大小,如binary(20)定义了最长为20个字符的字符串,每个字符占1byte的存储空间。如果用户字符串超出20字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示, 即 **\’**。 |
| 7 | SMALLINT | 2 | 短整型, 范围 [-32767, 32767], -32768用于NULL |
| 8 | TINYINT | 1 | 单字节整型,范围 [-127, 127], -128用于NULL |
| 9 | BOOL | 1 | 布尔型,{true, false} |
......@@ -106,7 +106,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```mysql
CREATE TABLE [IF NOT EXISTS] tb_name (timestamp_field_name TIMESTAMP, field1_name data_type1 [, field2_name data_type2 ...])
```
说明:1)表的第一个字段必须是TIMESTAMP,并且系统自动将其设为主键;2)表的每行长度不能超过64K字节;3)使用数据类型binary或nchar,需指定其最长的字节数,如binary(20),表示20字节。
说明:1)表的第一个字段必须是TIMESTAMP,并且系统自动将其设为主键;2)表的每行长度不能超过16K字节;3)使用数据类型binary或nchar,需指定其最长的字节数,如binary(20),表示20字节。
- **删除数据表**
......
......@@ -39,7 +39,7 @@ The full list of data types is listed below. For string types of data, we will
| 6 | DOUBLE | 8 | A standard nullable double float type with 15-16 significant digits and a range of [-1.7E308, 1.7E308]​ |
| 7 | BOOL | 1 | A nullable boolean type, [**`true`**, **`false`**] |
| 8 | TIMESTAMP | 8 | A nullable timestamp type with the same usage as the primary column timestamp |
| 9 | BINARY(*M*) | *M* | A nullable string type whose length is *M*, error should be threw with exceeded chars, the maximum length of *M* is 65526, but as maximum row size is 64K bytes, the actual upper limit will generally less than 65526. This type of string only supports ASCii encoded chars. |
| 9 | BINARY(*M*) | *M* | A nullable string type whose length is *M*, error should be threw with exceeded chars, the maximum length of *M* is 16374, but as maximum row size is 16K bytes, the actual upper limit will generally less than 16374. This type of string only supports ASCii encoded chars. |
| 10 | NCHAR(*M*) | 4 * *M* | A nullable string type whose length is *M*, error should be threw with exceeded chars. The **`NCHAR`** type supports Unicode encoded chars. |
All the keywords in a SQL statement are case-insensitive, but strings values are case-sensitive and must be quoted by a pair of `'` or `"`. To quote a `'` or a `"` , you can use the escape character `\`.
......@@ -86,7 +86,7 @@ All the keywords in a SQL statement are case-insensitive, but strings values are
1) The first column must be a `timestamp`, and the system will set it as the primary key.
2) The record size is limited to 64k bytes
2) The record size is limited to 16k bytes
3) For `binary` or `nchar` data types, the length must be specified. For example, binary(20) means a binary data type with 20 bytes.
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCSECONARYMERGE_H
#define TDENGINE_TSCSECONARYMERGE_H
#ifndef TDENGINE_TSCLOCALMERGE_H
#define TDENGINE_TSCLOCALMERGE_H
#ifdef __cplusplus
extern "C" {
......@@ -27,14 +27,7 @@ extern "C" {
#include "tsclient.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
/*
* @version 0.1
* @date 2018/01/05
* @author liaohj
* management of client-side reducer for metric query
*/
struct SQLFunctionCtx;
typedef struct SLocalDataSource {
......@@ -60,7 +53,6 @@ typedef struct SLocalReducer {
char * prevRowOfInput;
tFilePage * pResultBuf;
int32_t nResultBufSize;
// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
......@@ -81,13 +73,8 @@ typedef struct SLocalReducer {
} SLocalReducer;
typedef struct SSubqueryState {
/*
* the number of completed retrieval subquery, once this value equals to numOfVnodes,
* all retrieval are completed.Local merge is launched.
*/
int32_t numOfCompleted;
int32_t numOfTotal; // number of total sub-queries
int32_t code; // code from subqueries
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfTotal; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
......@@ -128,4 +115,4 @@ int32_t tscDoLocalMerge(SSqlObj *pSql);
}
#endif
#endif // TDENGINE_TSCSECONARYMERGE_H
#endif // TDENGINE_TSCLOCALMERGE_H
......@@ -26,11 +26,9 @@ extern "C" {
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
......
......@@ -28,7 +28,7 @@ extern "C" {
#include "exception.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "tscSecondaryMerge.h"
#include "tscLocalMerge.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
......@@ -64,7 +64,8 @@ typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
int64_t interval; // interval time
int64_t intervalTime; // interval time
int64_t slidingTime; // sliding time
SLimitVal limit; // limit info
uint64_t uid; // query meter uid
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
......@@ -122,15 +123,13 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableI
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnSTable(SSqlCmd* pCmd);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag);
SSchema* pColSchema, int16_t colType);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
......@@ -139,7 +138,7 @@ bool tscIsInsertData(char* sqlstr);
/* use for keep current db info temporarily, for handle table with db prefix */
// todo remove it
void tscGetDBInfoFromMeterId(char* tableId, char* db);
void tscGetDBInfoFromTableFullName(char* tableId, char* db);
int tscAllocPayload(SSqlCmd* pCmd, int size);
......@@ -253,7 +252,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
......
......@@ -64,12 +64,20 @@ SSchema* tscGetTableSchema(const STableMeta* pTableMeta);
SSchema *tscGetTableTagSchema(const STableMeta *pMeta);
/**
*
* get the column schema according to the column index
* @param pMeta
* @param startCol
* @param colIndex
* @return
*/
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t startCol);
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/**
* check if the schema is valid or not, including following aspects:
......
......@@ -85,7 +85,7 @@ typedef struct SSqlExpr {
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
......@@ -123,7 +123,7 @@ typedef struct SCond {
typedef struct SJoinNode {
char tableId[TSDB_TABLE_ID_LEN];
uint64_t uid;
int16_t tagCol;
int16_t tagColId;
} SJoinNode;
typedef struct SJoinInfo {
......@@ -155,20 +155,19 @@ typedef struct SParamInfo {
} SParamInfo;
typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for metadata (submit metadata)
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size;
/*
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* the table meta of table, the table meta will be used during submit, keep a ref
* to avoid it to be removed from cache
*/
STableMeta *pTableMeta;
......@@ -191,32 +190,28 @@ typedef struct SDataBlockList { // todo remove
} SDataBlockList;
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert/import type
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert/import type
char slidingTimeUnit;
STimeWindow window;
int64_t intervalTime; // aggregation time interval
int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SSqlExpr*>
SArray * exprList; // SArray<SSqlExpr*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t fillType; // final result fill type
int16_t fillType; // final result fill type
int16_t numOfTables;
STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf;
int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
} SQueryInfo;
typedef struct {
......@@ -299,11 +294,12 @@ typedef struct STscObj {
} STscObj;
typedef struct SSqlObj {
void * signature;
STscObj *pTscObj;
void (*fp)();
void (*fetchFp)();
void * param;
void *signature;
STscObj *pTscObj;
void *SRpcReqContext;
void (*fp)();
void (*fetchFp)();
void *param;
int64_t stime;
uint32_t queryId;
void * pStream;
......@@ -431,7 +427,7 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus
}
......
......@@ -151,8 +151,8 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_dumpMemoryLeakImp
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_initImp(JNIEnv *env, jobject jobj, jstring jconfigDir) {
if (jconfigDir != NULL) {
const char *confDir = (*env)->GetStringUTFChars(env, jconfigDir, NULL);
if (confDir && strlen(configDir) != 0) {
strcpy(configDir, confDir);
if (confDir && strlen(confDir) != 0) {
tstrncpy(configDir, confDir, TSDB_FILENAME_LEN);
}
(*env)->ReleaseStringUTFChars(env, jconfigDir, confDir);
}
......@@ -227,12 +227,12 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
}
if (user == NULL) {
jniTrace("jobj:%p, user is null, use tsDefaultUser", jobj);
user = tsDefaultUser;
jniTrace("jobj:%p, user is null, use default user %s", jobj, TSDB_DEFAULT_USER);
user = TSDB_DEFAULT_USER;
}
if (pass == NULL) {
jniTrace("jobj:%p, pass is null, use tsDefaultPass", jobj);
pass = tsDefaultPass;
jniTrace("jobj:%p, pass is null, use default password", jobj);
pass = TSDB_DEFAULT_PASS;
}
/*
......@@ -252,8 +252,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host);
if (dbname != NULL) (*env)->ReleaseStringUTFChars(env, jdbName, dbname);
if (user != NULL && user != tsDefaultUser) (*env)->ReleaseStringUTFChars(env, juser, user);
if (pass != NULL && pass != tsDefaultPass) (*env)->ReleaseStringUTFChars(env, jpass, pass);
if (user != NULL && user != (const char *)TSDB_DEFAULT_USER) (*env)->ReleaseStringUTFChars(env, juser, user);
if (pass != NULL && pass != (const char *)TSDB_DEFAULT_PASS) (*env)->ReleaseStringUTFChars(env, jpass, pass);
return ret;
}
......@@ -465,7 +465,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
int num_fields = taos_num_fields(result);
if (num_fields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields);
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void*)res, num_fields);
return JNI_NUM_OF_FIELDS_0;
}
......@@ -473,7 +473,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
if (row == NULL) {
int tserrno = taos_errno(result);
if (tserrno == 0) {
jniTrace("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void *)res, num_fields);
jniTrace("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, num_fields);
return JNI_FETCH_END;
} else {
jniTrace("jobj:%p, conn:%p, interruptted query", jobj, tscon);
......@@ -571,9 +571,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
sub = (jlong)tsub;
if (sub == 0) {
jniTrace("jobj:%p, failed to subscribe: topic:%s", jobj, jtopic);
jniTrace("jobj:%p, failed to subscribe: topic:%s", jobj, topic);
} else {
jniTrace("jobj:%p, successfully subscribe: topic: %s", jobj, jtopic);
jniTrace("jobj:%p, successfully subscribe: topic: %s", jobj, topic);
}
if (topic != NULL) (*env)->ReleaseStringUTFChars(env, jtopic, topic);
......@@ -583,7 +583,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%lld", jobj, sub);
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
......
......@@ -19,9 +19,8 @@
#include "tnote.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h"
#include "tscSecondaryMerge.h"
#include "tscLocalMerge.h"
#include "tscUtil.h"
#include "tsched.h"
#include "tschemautil.h"
......@@ -46,7 +45,8 @@ int doAsyncParseSql(SSqlObj* pSql) {
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to malloc payload");
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
tscQueueAsyncRes(pSql);
// tscQueueAsyncRes(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return code;
}
......@@ -211,7 +211,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscQueueAsyncRes(pSql);
return;
}
......@@ -285,7 +286,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
tscProcessSql(pSql);
} else {
SSchedMsg schedMsg;
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessFetchRow;
schedMsg.ahandle = pSql;
schedMsg.thandle = pRes->tsrow;
......@@ -368,7 +369,9 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
pSql->fp = pSql->fetchFp;
}
(*pSql->fp)(pSql->param, taosres, code);
if (pSql->fp) {
(*pSql->fp)(pSql->param, taosres, code);
}
if (shouldFree) {
tscTrace("%p sqlObj is automatically freed in async res", pSql);
......@@ -385,7 +388,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
int32_t* c = malloc(sizeof(int32_t));
*c = code;
SSchedMsg schedMsg;
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncError;
schedMsg.ahandle = fp;
schedMsg.thandle = param;
......@@ -401,7 +404,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
}
SSchedMsg schedMsg;
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncRes;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
......@@ -418,7 +421,7 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) {
void tscQueueAsyncFreeResult(SSqlObj *pSql) {
tscTrace("%p sqlObj put in queue to async free", pSql);
SSchedMsg schedMsg;
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncFree;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
......
......@@ -153,7 +153,7 @@ typedef struct SRateInfo {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable) {
if (!isValidDataType(dataType, dataBytes)) {
if (!isValidDataType(dataType)) {
tscError("Illegal data type %d or data type length %d", dataType, dataBytes);
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -1853,26 +1853,14 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
static void last_row_finalizer(SQLFunctionCtx *pCtx) {
// do nothing at the first stage
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
if (pResInfo->hasResult != DATA_SET_FLAG) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
}
return;
}
} else {
if (pResInfo->hasResult != DATA_SET_FLAG) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
}
return;
if (pResInfo->hasResult != DATA_SET_FLAG) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
}
return;
}
GET_RES_INFO(pCtx)->numOfRes = 1;
......@@ -2989,12 +2977,12 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
*/
static void tag_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType, true);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType, true);
}
static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType, true);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType, true);
}
static void copy_function(SQLFunctionCtx *pCtx) {
......@@ -3903,7 +3891,7 @@ static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
STSCompInfo *pInfo = pResInfo->interResultBuf;
pInfo->pTSBuf = tsBufCreate(false);
pInfo->pTSBuf = tsBufCreate(false, pCtx->order);
pInfo->pTSBuf->tsOrder = pCtx->order;
return true;
}
......@@ -3925,7 +3913,6 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
}
SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG;
}
......
......@@ -22,7 +22,6 @@
#include "taosdef.h"
#include "tscLog.h"
#include "qextbuffer.h"
#include "tscSecondaryMerge.h"
#include "tschemautil.h"
#include "tname.h"
......@@ -132,14 +131,14 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
for (int32_t i = 0; i < numOfRows; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(dst, pSchema[i].name, TSDB_COL_NAME_LEN - 1);
STR_WITH_MAXSIZE_TO_VARSTR(dst, pSchema[i].name, pField->bytes);
char *type = tDataTypeDesc[pSchema[i].type].aName;
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
STR_TO_VARSTR(dst, type);
STR_WITH_MAXSIZE_TO_VARSTR(dst, type, pField->bytes);
int32_t bytes = pSchema[i].bytes;
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
......@@ -157,7 +156,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) {
char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
const char *src = "TAG";
STR_WITH_SIZE_TO_VARSTR(output, src, strlen(src));
STR_WITH_MAXSIZE_TO_VARSTR(output, src, pField->bytes);
}
}
......@@ -171,7 +170,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
// field name
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(output, pSchema[i].name, TSDB_COL_NAME_LEN - 1);
STR_WITH_MAXSIZE_TO_VARSTR(output, pSchema[i].name, pField->bytes);
// type name
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
......@@ -183,8 +182,12 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
// type length
int32_t bytes = pSchema[i].bytes;
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes -= VARSTR_HEADER_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
}
}
*(int32_t *)(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 2) * totalNumOfRows + pField->bytes * i) = bytes;
......@@ -193,7 +196,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
const char *src = "TAG";
STR_WITH_SIZE_TO_VARSTR(target, src, strlen(src));
STR_WITH_MAXSIZE_TO_VARSTR(target, src, pField->bytes);
pTagValue += pSchema[i].bytes;
}
......@@ -220,15 +223,15 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
f.bytes = typeColLength;
f.bytes = typeColLength + VARSTR_HEADER_SIZE;
f.type = TSDB_DATA_TYPE_BINARY;
tstrncpy(f.name, "Type", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength,
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength + VARSTR_HEADER_SIZE,
typeColLength, false);
rowLen += typeColLength;
rowLen += typeColLength + VARSTR_HEADER_SIZE;
f.bytes = sizeof(int32_t);
f.type = TSDB_DATA_TYPE_INT;
......@@ -240,15 +243,15 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
rowLen += sizeof(int32_t);
f.bytes = noteColLength;
f.bytes = noteColLength + VARSTR_HEADER_SIZE;
f.type = TSDB_DATA_TYPE_BINARY;
tstrncpy(f.name, "Note", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength,
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength + VARSTR_HEADER_SIZE,
noteColLength, false);
rowLen += noteColLength;
rowLen += noteColLength + VARSTR_HEADER_SIZE;
return rowLen;
}
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscSecondaryMerge.h"
#include "os.h"
#include "tlosertree.h"
#include "tscUtil.h"
......@@ -21,6 +20,7 @@
#include "tsclient.h"
#include "tutil.h"
#include "tscLog.h"
#include "tscLocalMerge.h"
typedef struct SCompareParam {
SLocalDataSource **pLocalData;
......@@ -230,6 +230,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (ds == NULL) {
tscError("%p failed to create merge structure", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tfree(pReducer);
return;
}
......@@ -266,6 +267,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// no data actually, no need to merge result.
if (idx == 0) {
tfree(pReducer);
return;
}
......@@ -282,6 +284,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
if (pReducer->pLoserTree == NULL || pRes->code != 0) {
tfree(pReducer);
return;
}
......@@ -325,7 +328,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tfree(pReducer->pResultBuf);
tfree(pReducer->pFinalRes);
tfree(pReducer->prevRowOfInput);
tfree(pReducer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
}
......@@ -353,7 +356,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->numOfGroups = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);;
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t revisedSTime =
......@@ -410,13 +413,13 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
}
int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, int32_t orderType) {
int32_t ret = tscFlushTmpBufferImpl(pMemoryBuf, pDesc, pPage, orderType);
if (ret != 0) {
return -1;
int32_t ret = 0;
if ((ret = tscFlushTmpBufferImpl(pMemoryBuf, pDesc, pPage, orderType)) != 0) {
return ret;
}
if (!tExtMemBufferFlush(pMemoryBuf)) {
return -1;
if ((ret = tExtMemBufferFlush(pMemoryBuf)) != 0) {
return ret;
}
return 0;
......@@ -437,9 +440,9 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
// current buffer is full, need to flushed to disk
assert(pPage->num == pModel->capacity);
int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
if (ret != 0) {
return -1;
int32_t code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
if (code != 0) {
return code;
}
int32_t remain = numOfRows - numOfRemainEntries;
......@@ -455,8 +458,8 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows);
if (pPage->num == pModel->capacity) {
if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) {
return -1;
if ((code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType)) != TSDB_CODE_SUCCESS) {
return code;
}
} else {
pPage->num = numOfWriteElems;
......@@ -685,6 +688,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tfree(pSchema);
return pRes->code;
}
......@@ -1092,14 +1096,6 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
//
// maxOutput = 1;
// continue;
// }
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
......@@ -1109,8 +1105,9 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
continue;
}
if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) {
maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes;
SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
}
}
......@@ -1260,7 +1257,6 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
#ifdef _DEBUG_VIEW
printf("final result before interpo:\n");
assert(0);
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif
......
......@@ -252,7 +252,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
numType = tscToInteger(pToken, &iv, &endptr);
if (TK_ILLEGAL == numType) {
return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
} else if (errno == ERANGE || iv > INT64_MAX || iv <= INT64_MIN) {
} else if (errno == ERANGE || iv == INT64_MIN) {
return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
}
......@@ -594,7 +594,6 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
assert(pDataBlock->headerSize >= 0);
// expand the allocated size
if (remain < rowSize * factor) {
......@@ -1328,12 +1327,14 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
int32_t ret = TSDB_CODE_SUCCESS;
if (initialParse) {
assert(!pSql->cmd.parseFinished);
char* p = pSql->sqlstr;
pSql->sqlstr = NULL;
tscPartiallyFreeSqlObj(pSql);
pSql->sqlstr = p;
} else {
} else if (!pSql->cmd.parseFinished) {
tscTrace("continue parse sql: %s", pSql->cmd.curSql);
}
......
此差异已折叠。
......@@ -32,7 +32,6 @@ int32_t tscGetNumOfTags(const STableMeta* pTableMeta) {
}
if (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert(tinfo.numOfTags >= 0);
return tinfo.numOfTags;
}
......@@ -64,14 +63,6 @@ SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
#if 0
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert (pTableMeta->pSTable != NULL);
return pTableMeta->pSTable->tableInfo;
}
#endif
return pTableMeta->tableInfo;
}
......@@ -119,11 +110,24 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
return (rowLen <= TSDB_MAX_BYTES_PER_ROW);
}
SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t startCol) {
SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
assert(pTableMeta != NULL);
SSchema* pSchema = (SSchema*) pTableMeta->schema;
return &pSchema[startCol];
return &pSchema[colIndex];
}
// TODO for large number of columns, employ the binary search method
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
if (pTableMeta->schema[i].colId == colId) {
return &pTableMeta->schema[i];
}
}
return NULL;
}
struct SSchema tscGetTbnameColumnSchema() {
......
......@@ -14,20 +14,18 @@
*/
#include "os.h"
#include "qsqltype.h"
#include "tcache.h"
#include "trpc.h"
#include "tscLocalMerge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tscLog.h"
#include "qsqltype.h"
#define TSC_MGMT_VNODE 999
......@@ -198,7 +196,9 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql,
.code = 0
};
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
assert(pSql->SRpcReqContext != NULL);
return TSDB_CODE_SUCCESS;
}
......@@ -274,7 +274,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else {
tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
tscTrace("%p query is cancelled, code:%s", pSql, tstrerror(pRes->code));
}
if (pRes->code == TSDB_CODE_SUCCESS) {
......@@ -414,7 +414,6 @@ void tscKillSTableQuery(SSqlObj *pSql) {
for (int i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
......@@ -424,7 +423,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
// taosStopRpcConn(pSql->pSubs[i]->);
rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext);
}
/*
......@@ -644,14 +643,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->queryType = htons(pQueryInfo->type);
pQueryMsg->queryType = htonl(pQueryInfo->type);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons(numOfOutput);
if (numOfOutput < 0) {
tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
return -1;
}
// set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
......@@ -665,7 +660,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -723,6 +718,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
......@@ -784,7 +780,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
(pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
tscError("%p sid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, total, numOfTagColumns,
pCol->colIndex, pColSchema->name);
pCol->colIndex.columnIndex, pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -800,6 +796,27 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
}
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
*pMsg = 0;
pMsg++;
} else {
strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
}
// compressed ts block
pQueryMsg->tsOffset = htonl(pMsg - pStart);
int32_t tsLen = 0;
......@@ -824,27 +841,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
}
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
}
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
*pMsg = 0;
pMsg++;
} else {
strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
}
int32_t msgLen = pMsg - pStart;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
......@@ -983,7 +979,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
tstrncpy(pDropDbMsg->db, pTableMetaInfo->name, sizeof(pDropDbMsg->db));
pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DB;
......@@ -1053,7 +1049,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pTableMetaInfo->name);
tstrncpy(pDropMsg->user, pTableMetaInfo->name, sizeof(pDropMsg->user));
return TSDB_CODE_SUCCESS;
}
......@@ -1175,7 +1171,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
// use dbinfo from table id without modifying current db info
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db);
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;
......@@ -1252,7 +1248,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db);
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
pAlterTableMsg->type = htons(pAlterInfo->type);
......@@ -1473,7 +1469,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += len;
}
pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
pCmd->payloadLen = pMsg - (char*)pInfoMsg;
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
tfree(tmpData);
......@@ -1577,7 +1573,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = pStart;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db);
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db);
pMsg += sizeof(SMgmtHead);
......@@ -1813,6 +1809,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
// todo handle out of memory case
if (pTableMetaInfo->pTableMeta == NULL) {
free(pTableMeta);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -1932,113 +1929,6 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
}
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
void ** metricMetaList = NULL;
int32_t * sizes = NULL;
int32_t num = htons(*(int16_t *)rsp);
rsp += sizeof(int16_t);
metricMetaList = calloc(1, POINTER_BYTES * num);
sizes = calloc(1, sizeof(int32_t) * num);
// return with error code
if (metricMetaList == NULL || sizes == NULL) {
tfree(metricMetaList);
tfree(sizes);
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pSql->res.code;
}
for (int32_t k = 0; k < num; ++k) {
pMeta = (SSuperTableMeta *)rsp;
size_t size = (size_t)pSql->res.rspLen - 1;
rsp = rsp + sizeof(SSuperTableMeta);
pMeta->numOfTables = htonl(pMeta->numOfTables);
pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
pMeta->tagLen = htons(pMeta->tagLen);
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
char *pBuf = calloc(1, size);
if (pBuf == NULL) {
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error_clean;
}
SSuperTableMeta *pNewMetricMeta = (SSuperTableMeta *)pBuf;
metricMetaList[k] = pNewMetricMeta;
pNewMetricMeta->numOfTables = pMeta->numOfTables;
pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes;
pNewMetricMeta->tagLen = pMeta->tagLen;
pBuf = pBuf + sizeof(SSuperTableMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *);
for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) {
SVnodeSidList *pSidLists = (SVnodeSidList *)rsp;
memcpy(pBuf, pSidLists, sizeof(SVnodeSidList));
pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta; // offset value
SVnodeSidList *pLists = (SVnodeSidList *)pBuf;
tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
rsp += sizeof(SVnodeSidList);
size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
memcpy(pBuf, rsp, elemSize);
((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
rsp += elemSize;
pBuf += elemSize;
}
}
sizes[k] = pBuf - (char *)pNewMetricMeta;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
for (int32_t i = 0; i < num; ++i) {
char name[TSDB_MAX_TAGS_LEN + 1] = {0};
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pQueryInfo, name, pTableMetaInfo->pTableMeta->uid);
#ifdef _DEBUG_VIEW
printf("generate the metric key:%s, index:%d\n", name, i);
#endif
// release the used metricmeta
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
sizes[i], tsMetricMetaKeepTimer);
tfree(metricMetaList[i]);
// failed to put into cache
if (pTableMetaInfo->pMetricMeta == NULL) {
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error_clean;
}
}
_error_clean:
// free allocated resource
for (int32_t i = 0; i < num; ++i) {
tfree(metricMetaList[i]);
}
free(sizes);
free(metricMetaList);
#endif
SSqlRes* pRes = &pSql->res;
// NOTE: the order of several table must be preserved.
......@@ -2291,7 +2181,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
}
pRes->row = 0;
tscTrace("%p numOfRows:%d, offset:%d, complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
tscTrace("%p numOfRows:%" PRId64 ", offset:%" PRId64 ", complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
return 0;
}
......@@ -2432,6 +2322,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
SQueryInfo *pNewQueryInfo = NULL;
if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
return code;
}
......
......@@ -20,14 +20,9 @@
#include "tnote.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscompression.h"
#include "tsocket.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "tutil.h"
......@@ -589,7 +584,7 @@ char *taos_errstr(TAOS_RES *tres) {
void taos_config(int debug, char *log_path) {
uDebugFlag = debug;
strcpy(tsLogDir, log_path);
tstrncpy(tsLogDir, log_path, TSDB_FILENAME_LEN);
}
char *taos_get_server_info(TAOS *taos) {
......@@ -612,7 +607,9 @@ int* taos_fetch_lengths(TAOS_RES *res) {
char *taos_get_client_info() { return version; }
void taos_stop_query(TAOS_RES *res) {
if (res == NULL) return;
if (res == NULL) {
return;
}
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -632,7 +629,7 @@ void taos_stop_query(TAOS_RES *res) {
return;
}
//taosStopRpcConn(pSql->thandle);
rpcCancelRequest(pSql->SRpcReqContext);
tscTrace("%p query is cancelled", res);
}
......@@ -724,6 +721,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
if (sqlLen > tsMaxSQLStringLen) {
tscError("%p sql too long", pSql);
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
tfree(pSql);
return pRes->code;
}
......@@ -732,6 +730,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
tscTrace("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
tfree(pSql);
return pRes->code;
}
......@@ -856,6 +855,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH);
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
tfree(pSql);
return pRes->code;
}
......@@ -863,6 +863,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
if (str == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
tfree(pSql);
return pRes->code;
}
......@@ -878,6 +879,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
free(str);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
return pRes->code;
}
......
......@@ -125,7 +125,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
// launch stream computing in a new thread
SSchedMsg schedMsg;
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessStreamLaunchQuery;
schedMsg.ahandle = pStream;
schedMsg.thandle = (void *)1;
......@@ -239,7 +239,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
/* no resuls in the query range, retry */
// todo set retry dynamic time
int32_t retry = tsProjectExecInterval;
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry);
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
tscSetRetryTimer(pStream, pStream->pSql, retry);
return;
......@@ -250,7 +250,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
}
}
tscTrace("%p stream:%p, query on:%s, fetch result completed, fetched rows:%d", pSql, pStream, pTableMetaInfo->name,
tscTrace("%p stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql, pStream, pTableMetaInfo->name,
pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated
......@@ -499,7 +499,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscFreeSqlObj(pSql);
return NULL;;
return NULL;
}
strtolower(pSql->sqlstr, sqlstr);
......
......@@ -238,7 +238,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
taosArraySort( tables, tscCompareTidTags );
tscBuildVgroupTableInfo( pTableMetaInfo, tables );
tscBuildVgroupTableInfo(pSql, pTableMetaInfo, tables);
}
taosArrayDestroy(tables);
......@@ -291,7 +291,7 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
fclose(fp);
taosArraySort(progress, tscCompareSubscriptionProgress);
tscTrace("subscription progress loaded, %d tables: %s", taosArrayGetSize(progress), pSub->topic);
tscTrace("subscription progress loaded, %zu tables: %s", taosArrayGetSize(progress), pSub->topic);
return 1;
}
......@@ -350,7 +350,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
pSub->interval = interval;
if (fp != NULL) {
tscTrace("asynchronize subscription, create new timer", topic);
tscTrace("asynchronize subscription, create new timer: %s", topic);
pSub->fp = fp;
pSub->param = param;
taosTmrReset(tscProcessSubscriptionTimer, interval, pSub, tscTmr, &pSub->pTimer);
......@@ -435,7 +435,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
} else {
char path[256];
sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic);
remove(path);
if (remove(path) != 0) {
tscError("failed to remove progress file, topic = %s, error = %s", pSub->topic, strerror(errno));
}
}
tscFreeSqlObj(pSub->pSql);
......
此差异已折叠。
......@@ -220,7 +220,7 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
if (strlen(tsLocale) == 0) { // locale does not set yet
char* defaultLocale = setlocale(LC_CTYPE, "");
strcpy(tsLocale, defaultLocale);
tstrncpy(tsLocale, defaultLocale, sizeof(tsLocale));
}
// set the user specified locale
......@@ -304,7 +304,7 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
assert(cfg != NULL);
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
strcpy(tsTimezone, pStr);
tstrncpy(tsTimezone, pStr, sizeof(tsTimezone));
tsSetTimeZone();
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscTrace("timezone set:%s, input:%s by taos_options", tsTimezone, pStr);
......
......@@ -22,7 +22,7 @@
#include "tkey.h"
#include "tmd5.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscLocalMerge.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
......@@ -32,7 +32,7 @@
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->pCond == NULL) {
return NULL;
......@@ -70,13 +70,6 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) {
taosArrayPush(pTagCond->pCond, &cond);
}
bool tscQueryOnSTable(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
(pCmd->msgType == TSDB_MSG_TYPE_QUERY);
}
bool tscQueryTags(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -95,32 +88,8 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) {
return true;
}
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) {
bool hasTags = false;
int32_t numOfSelectivity = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functId == TSDB_FUNC_TAG_DUMMY) {
hasTags = true;
continue;
}
if ((aAggs[functId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
}
}
if (numOfSelectivity > 0 && hasTags) {
return true;
}
return false;
}
void tscGetDBInfoFromMeterId(char* tableId, char* db) {
// todo refactor, extract methods and move the common module
void tscGetDBInfoFromTableFullName(char* tableId, char* db) {
char* st = strstr(tableId, TS_PATH_DELIMITER);
if (st != NULL) {
char* end = strstr(st + 1, TS_PATH_DELIMITER);
......@@ -181,8 +150,14 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
if (functionId != TSDB_FUNC_PRJ &&
functionId != TSDB_FUNC_TAGPRJ &&
functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS &&
functionId != TSDB_FUNC_ARITHM &&
functionId != TSDB_FUNC_TS_COMP &&
functionId != TSDB_FUNC_TID_TAG) {
return false;
}
}
......@@ -209,10 +184,14 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde
return pQueryInfo->order.orderColId >= 0;
}
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) {
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
return false;
}
}
......@@ -225,9 +204,10 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr == NULL) {
return false;
}
assert(pExpr != NULL);
// if (pExpr == NULL) {
// return false;
// }
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAG) {
......@@ -238,6 +218,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return false;
}
}
return true;
}
......@@ -1451,8 +1432,6 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
return false;
}
assert(pSql->fp != NULL);
STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
return false;
......@@ -1774,7 +1753,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
pNewQueryInfo->type = pPrevQueryInfo->type;
} else {
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
}
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
......@@ -1799,19 +1778,26 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
// make sure the the sqlExpr for each fields is correct
// todo handle the agg arithmetic expression
// todo handle the agg arithmetic expression
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
bool matched = false;
for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(field->name, pExpr1->aliasName) == 0) { // eatablish link according to the result field name
if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
pInfo->pSqlExpr = pExpr1;
matched = true;
break;
}
}
assert(matched);
}
tscFieldInfoUpdateOffset(pNewQueryInfo);
......@@ -1849,12 +1835,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pFinalInfo->vgroupList != NULL);
}
if (cmd == TSDB_SQL_SELECT) {
size_t size = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace(
"%p new subquery:%p, tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d,"
"%p new subquery:%p, tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%zu, colList:%zu,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->window.skey,
......@@ -1900,16 +1886,21 @@ void tscDoQuery(SSqlObj* pSql) {
}
if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscHandleMasterJoinQuery(pSql);
return;
} else {
// for first stage sub query, iterate all vnodes to get all timestamp
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
// doProcessSql(pSql);
assert(0);
} else { // for first stage sub query, iterate all vnodes to get all timestamp
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscProcessSql(pSql);
} else { // secondary stage join query.
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscHandleMasterSTableQuery(pSql);
} else {
tscProcessSql(pSql);
}
}
}
return;
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscHandleMasterSTableQuery(pSql);
return;
......@@ -1919,11 +1910,13 @@ void tscDoQuery(SSqlObj* pSql) {
}
}
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) {
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->joinInfo.left.uid == uid) {
return pTagCond->joinInfo.left.tagCol;
return pTagCond->joinInfo.left.tagColId;
} else if (pTagCond->joinInfo.right.uid == uid) {
return pTagCond->joinInfo.right.tagColId;
} else {
return pTagCond->joinInfo.right.tagCol;
assert(0);
}
}
......@@ -1980,11 +1973,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
return false;
}
assert(pRes->completed);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pRes->completed);
// for normal table, no need to try any more if results are all retrieved from one vnode
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) {
return false;
......@@ -2006,12 +1998,11 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* if case of: multi-vnode super table projection query
*/
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
while (++pTableMetaInfo->vgroupIndex < totalVgroups) {
tscTrace("%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%d", pSql,
tscTrace("%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%" PRId64, pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfClauseTotal);
/*
......
......@@ -24,19 +24,19 @@ TEST(testCase, parse_time) {
int64_t time = 0, time1 = 0;
taosParseTime(t1, &time, strlen(t1), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t1, &time, strlen(t1), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 1514739661952);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, timezone * MILLISECOND_PER_SECOND);
char t2[] = "2018-1-1T1:1:1.952Z";
taosParseTime(t2, &time, strlen(t2), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t2, &time, strlen(t2), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 1514739661952 + 28800000);
char t3[] = "2018-1-1 1:01:01.952";
taosParseTime(t3, &time, strlen(t3), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t3, &time, strlen(t3), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 1514739661952);
char t4[] = "2018-1-1 1:01:01.9";
......@@ -45,122 +45,122 @@ TEST(testCase, parse_time) {
char t7[] = "2018-01-01 01:01:01.9";
char t8[] = "2018-01-01 01:01:01.9007865";
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t5, &time1, strlen(t5), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t5, &time1, strlen(t5), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t6, &time1, strlen(t6), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t6, &time1, strlen(t6), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t7, &time1, strlen(t7), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t7, &time1, strlen(t7), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
taosParseTime(t5, &time, strlen(t5), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t8, &time1, strlen(t8), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t5, &time, strlen(t5), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t8, &time1, strlen(t8), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
char t9[] = "2017-4-3 1:1:2.980";
char t10[] = "2017-4-3T2:1:2.98+9:00";
taosParseTime(t9, &time, strlen(t9), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t10, &time1, strlen(t10), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t9, &time, strlen(t9), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t10, &time1, strlen(t10), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
char t11[] = "2017-4-3T2:1:2.98+09:00";
taosParseTime(t11, &time, strlen(t11), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t10, &time1, strlen(t10), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t11, &time, strlen(t11), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t10, &time1, strlen(t10), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
char t12[] = "2017-4-3T2:1:2.98+0900";
taosParseTime(t11, &time, strlen(t11), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t12, &time1, strlen(t12), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t11, &time, strlen(t11), TSDB_TIME_PRECISION_MILLI, 0);
taosParseTime(t12, &time1, strlen(t12), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
taos_options(TSDB_OPTION_TIMEZONE, "UTC");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 0);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Shanghai");
deltaToUtcInitOnce();
char t14[] = "1970-1-1T0:0:0Z";
taosParseTime(t14, &time, strlen(t14), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t14, &time, strlen(t14), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 0);
char t40[] = "1970-1-1 0:0:0.999999999";
taosParseTime(t40, &time, strlen(t40), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t40, &time, strlen(t40), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 999 + timezone * MILLISECOND_PER_SECOND);
char t41[] = "1997-1-1 0:0:0.999999999";
taosParseTime(t41, &time, strlen(t41), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t41, &time, strlen(t41), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 852048000999);
int64_t k = timezone;
char t42[] = "1997-1-1T0:0:0.999999999Z";
taosParseTime(t42, &time, strlen(t42), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t42, &time, strlen(t42), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 852048000999 - timezone * MILLISECOND_PER_SECOND);
////////////////////////////////////////////////////////////////////
// illegal timestamp format
char t15[] = "2017-12-33 0:0:0";
EXPECT_EQ(taosParseTime(t15, &time, strlen(t15), TSDB_TIME_PRECISION_MILLI), -1);
EXPECT_EQ(taosParseTime(t15, &time, strlen(t15), TSDB_TIME_PRECISION_MILLI, 0), -1);
char t16[] = "2017-12-31 99:0:0";
EXPECT_EQ(taosParseTime(t16, &time, strlen(t16), TSDB_TIME_PRECISION_MILLI), -1);
EXPECT_EQ(taosParseTime(t16, &time, strlen(t16), TSDB_TIME_PRECISION_MILLI, 0), -1);
char t17[] = "2017-12-31T9:0:0";
EXPECT_EQ(taosParseTime(t17, &time, strlen(t17), TSDB_TIME_PRECISION_MILLI), -1);
EXPECT_EQ(taosParseTime(t17, &time, strlen(t17), TSDB_TIME_PRECISION_MILLI, 0), -1);
char t18[] = "2017-12-31T9:0:0.Z";
EXPECT_EQ(taosParseTime(t18, &time, strlen(t18), TSDB_TIME_PRECISION_MILLI), -1);
EXPECT_EQ(taosParseTime(t18, &time, strlen(t18), TSDB_TIME_PRECISION_MILLI, 0), -1);
char t19[] = "2017-12-31 9:0:0.-1";
EXPECT_EQ(taosParseTime(t19, &time, strlen(t19), TSDB_TIME_PRECISION_MILLI), -1);
EXPECT_EQ(taosParseTime(t19, &time, strlen(t19), TSDB_TIME_PRECISION_MILLI, 0), -1);
char t20[] = "2017-12-31 9:0:0.1+12:99";
EXPECT_EQ(taosParseTime(t20, &time, strlen(t20), TSDB_TIME_PRECISION_MILLI), 0);
EXPECT_EQ(taosParseTime(t20, &time, strlen(t20), TSDB_TIME_PRECISION_MILLI, 0), 0);
EXPECT_EQ(time, 1514682000100);
char t21[] = "2017-12-31T9:0:0.1+12:99";
EXPECT_EQ(taosParseTime(t21, &time, strlen(t21), TSDB_TIME_PRECISION_MILLI), -1);
EXPECT_EQ(taosParseTime(t21, &time, strlen(t21), TSDB_TIME_PRECISION_MILLI, 0), -1);
char t22[] = "2017-12-31 9:0:0.1+13:1";
EXPECT_EQ(taosParseTime(t22, &time, strlen(t22), TSDB_TIME_PRECISION_MILLI), 0);
EXPECT_EQ(taosParseTime(t22, &time, strlen(t22), TSDB_TIME_PRECISION_MILLI, 0), 0);
char t23[] = "2017-12-31T9:0:0.1+13:1";
EXPECT_EQ(taosParseTime(t23, &time, strlen(t23), TSDB_TIME_PRECISION_MILLI), 0);
EXPECT_EQ(taosParseTime(t23, &time, strlen(t23), TSDB_TIME_PRECISION_MILLI, 0), 0);
//======================== add some case ============================//
char b1[] = "9999-12-31 23:59:59.999";
taosParseTime(b1, &time, strlen(b1), TSDB_TIME_PRECISION_MILLI);
taosParseTime(b1, &time, strlen(b1), TSDB_TIME_PRECISION_MILLI,0);
EXPECT_EQ(time, 253402271999999);
char b2[] = "2020-01-01 01:01:01.321";
taosParseTime(b2, &time, strlen(b2), TSDB_TIME_PRECISION_MILLI);
taosParseTime(b2, &time, strlen(b2), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 1577811661321);
taos_options(TSDB_OPTION_TIMEZONE, "America/New_York");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 18000 * MILLISECOND_PER_SECOND);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Tokyo");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, -32400 * MILLISECOND_PER_SECOND);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Shanghai");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, -28800 * MILLISECOND_PER_SECOND);
}
......
......@@ -36,7 +36,7 @@ extern "C" {
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)); \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \
varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \
} while (0)
......
......@@ -92,10 +92,6 @@ extern int32_t tsNumOfMnodes;
extern int32_t tsMaxShellConns;
extern int32_t tsMaxTables;
extern char tsDefaultDB[];
extern char tsDefaultUser[];
extern char tsDefaultPass[];
extern char tsMqttBrokerAddress[];
extern char tsMqttBrokerClientId[];
......
......@@ -105,7 +105,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
}
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0)) return -1;
if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
......@@ -588,7 +588,7 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
if (kvRowNCols(nrow) - colIdx - 1 > 0) {
for (int i = colIdx + 1; i < kvRowNCols(nrow); i++) {
kvRowColIdxAt(nrow, i)->colId = kvRowColIdxAt(row, i)->colId;
kvRowColIdxAt(nrow, i)->offset += diff;
kvRowColIdxAt(nrow, i)->offset = kvRowColIdxAt(row, i)->offset + diff;
}
memcpy(kvRowColVal(nrow, kvRowColIdxAt(nrow, colIdx + 1)), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1)),
POINTER_DISTANCE(kvRowEnd(row), kvRowColVal(row, kvRowColIdxAt(row, colIdx + 1))));
......
......@@ -109,10 +109,6 @@ int32_t tsReplications = TSDB_DEFAULT_REPLICA_NUM;
int16_t tsAffectedRowsMod = 0;
int32_t tsNumOfMnodes = 3;
int32_t tsMaxShellConns = 5000;
char tsDefaultDB[TSDB_DB_NAME_LEN] = {0};
char tsDefaultUser[64] = "root";
char tsDefaultPass[64] = "taosdata";
int32_t tsMaxConnections = 5000;
int32_t tsBalanceInterval = 300; // seconds
......@@ -713,37 +709,6 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// login configs
cfg.option = "defaultDB";
cfg.ptr = tsDefaultDB;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_DB_NAME_LEN - 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "defaultUser";
cfg.ptr = tsDefaultUser;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_USER_LEN - 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "defaultPass";
cfg.ptr = tsDefaultPass;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_PASSWORD_LEN - 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttBrokerAddress";
cfg.ptr = tsMqttBrokerAddress;
cfg.valType = TAOS_CFG_VTYPE_STRING;
......
......@@ -363,16 +363,8 @@ char tTokenTypeSwitcher[13] = {
TSDB_DATA_TYPE_NCHAR, // TK_NCHAR
};
bool isValidDataType(int32_t type, int32_t length) {
if (type < TSDB_DATA_TYPE_NULL || type > TSDB_DATA_TYPE_NCHAR) {
return false;
}
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
// return length >= 0 && length <= TSDB_MAX_BINARY_LEN;
}
return true;
bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_NCHAR;
}
bool isNull(const char *val, int32_t type) {
......
......@@ -256,11 +256,30 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SDataRow trow = (SDataRow)pBlk->data;
tdInitDataRow(trow, pSchema);
union {
char buf[sizeof(int64_t)];
tstr str;
} nullVal;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i;
char* val = (char*)row[i];
if (IS_VAR_DATA_TYPE(c->type)) {
val -= sizeof(VarDataLenT);
if (val == NULL) {
val = nullVal.buf;
if (c->type == TSDB_DATA_TYPE_BINARY) {
setNull(nullVal.str.data, TSDB_DATA_TYPE_BINARY, 1);
nullVal.str.len = 1;
} else {
setNull(nullVal.str.data, TSDB_DATA_TYPE_NCHAR, 4);
nullVal.str.len = 4;
}
} else {
val -= sizeof(VarDataLenT);
}
} else if (val == NULL) {
val = nullVal.buf;
setNull(val, c->type, c->bytes);
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
......
......@@ -48,8 +48,8 @@ int main(int argc, char *argv[]) {
taosInitLog("cq.log", 100000, 10);
SCqCfg cqCfg;
strcpy(cqCfg.user, "root");
strcpy(cqCfg.pass, "taosdata");
strcpy(cqCfg.user, TSDB_DEFAULT_USER);
strcpy(cqCfg.pass, TSDB_DEFAULT_PASS);
cqCfg.vgId = 2;
cqCfg.cqWrite = writeToQueue;
......
......@@ -118,6 +118,8 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
mnodeCreateMsg(pWrite, pMsg);
dTrace("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
......@@ -128,6 +130,7 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
SMnodeMsg *pWrite = pRaw;
if (pWrite == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
dnodeReprocessMnodeWriteMsg(pWrite);
......@@ -146,19 +149,21 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
}
static void *dnodeProcessMnodeWriteQueue(void *param) {
SMnodeMsg *pWriteMsg;
SMnodeMsg *pWrite;
int32_t type;
void * unUsed;
while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWriteMsg, &unUsed) == 0) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dTrace("dnodeProcessMnodeWriteQueue: got no message from qset, exiting...");
break;
}
dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWriteMsg);
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
dTrace("app:%p:%p, msg:%s will be processed in mwrite queue", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWrite);
dnodeSendRpcMnodeWriteRsp(pWrite, code);
}
return NULL;
......@@ -168,9 +173,15 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg;
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dTrace("app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMnodeWriteMsg(pWrite);
} else {
} else {
dTrace("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
}
......
......@@ -407,11 +407,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
}
SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) {
pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId);
}
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
......@@ -616,6 +612,16 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
// fill cluster cfg parameters
pStatus->clusterCfg.numOfMnodes = tsNumOfMnodes;
pStatus->clusterCfg.mnodeEqualVnodeNum = tsMnodeEqualVnodeNum;
pStatus->clusterCfg.offlineThreshold = tsOfflineThreshold;
pStatus->clusterCfg.statusInterval = tsStatusInterval;
strcpy(pStatus->clusterCfg.arbitrator, tsArbitrator);
strcpy(pStatus->clusterCfg.timezone, tsTimezone);
strcpy(pStatus->clusterCfg.locale, tsLocale);
strcpy(pStatus->clusterCfg.charset, tsCharset);
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
......
......@@ -32,11 +32,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs
/**
* Destroy QInfo object
*
* @param qinfo
* @return
* @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
*/
void qDestroyQueryInfo(qinfo_t qinfo);
void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param);
/**
* the main query execution function, including query on both table and multitables,
......@@ -45,7 +45,7 @@ void qDestroyQueryInfo(qinfo_t qinfo);
* @param qinfo
* @return
*/
void qTableQuery(qinfo_t qinfo);
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param);
/**
* Retrieve the produced results information, if current query is not paused or completed,
......@@ -80,9 +80,12 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
* @param qinfo
* @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
* @return
*/
int32_t qKillQuery(qinfo_t qinfo);
int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param);
#ifdef __cplusplus
}
......
......@@ -55,7 +55,7 @@ typedef enum {
typedef struct taosField {
char name[65];
uint8_t type;
short bytes;
int16_t bytes;
} TAOS_FIELD;
#ifdef _TD_GO_DLL_
......
......@@ -83,6 +83,9 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_DATA_NULL_STR "NULL"
#define TSDB_DATA_NULL_STR_L "null"
#define TSDB_DEFAULT_USER "root"
#define TSDB_DEFAULT_PASS "taosdata"
#define TSDB_TRUE 1
#define TSDB_FALSE 0
#define TSDB_OK 0
......@@ -156,7 +159,7 @@ typedef struct tDataTypeDescriptor {
extern tDataTypeDescriptor tDataTypeDesc[11];
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
bool isValidDataType(int32_t type, int32_t length);
bool isValidDataType(int32_t type);
bool isNull(const char *val, int32_t type);
void setVardataNull(char* val, int32_t type);
......@@ -209,8 +212,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_SQL_SHOW_LEN 256
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb
#define TSDB_MAX_BYTES_PER_ROW 65535
#define TSDB_MAX_TAGS_LEN 65535
#define TSDB_MAX_BYTES_PER_ROW 16384
#define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_TAGS 128
#define TSDB_AUTH_LEN 16
......
......@@ -113,14 +113,19 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "mnode inva
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "mnode invalid stream id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "mnode invalid connection")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "mnode object already there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "mnode sdb error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "sdb object already there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "sdb app error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE, 0, 0x0322, "sdb invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_NOT_THERE, 0, 0x0323, "sdb object not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_META_ROW, 0, 0x0324, "sdb invalid meta row")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_KEY_TYPE, 0, 0x0325, "sdb invalid key type")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, 0, 0x0330, "mnode dnode already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, 0, 0x0331, "mnode dnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, 0, 0x0332, "mnode vgroup not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_REMOVE_MASTER, 0, 0x0333, "mnode cant not remove master")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_ENOUGH_DNODES, 0, 0x0334, "mnode no enough dnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT, 0, 0x0335, "mnode cluster cfg inconsistent")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "mnode accounts already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "mnode invalid account")
......@@ -174,6 +179,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "vnode no d
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "vnode no such file or directory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "vnode out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app error")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0214, "vnode no write auth")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id")
......@@ -195,6 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_MSG, 0, 0x0701, "query inva
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_DISKSPACE, 0, 0x0702, "query no diskspace")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_OUT_OF_MEMORY, 0, 0x0703, "query out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "query duplicated join key")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired")
......
......@@ -455,7 +455,7 @@ typedef struct {
int16_t orderType; // used in group by xx order by xxx
int64_t limit;
int64_t offset;
uint16_t queryType; // denote another query process
uint32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers
int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t fillType; // interpolate type
......@@ -543,6 +543,7 @@ typedef struct {
int32_t dnodeId;
uint32_t moduleStatus;
uint32_t numOfVnodes;
uint32_t reserved;
} SDMDnodeCfg;
typedef struct {
......@@ -557,18 +558,30 @@ typedef struct {
} SDMMnodeInfos;
typedef struct {
uint32_t version;
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN];
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfTotalVnodes; // from config file
uint16_t openVnodes;
uint16_t numOfCores;
float diskAvailable; // GB
uint8_t alternativeRole;
uint8_t reserve[15];
SVnodeLoad load[];
int32_t numOfMnodes; // tsNumOfMnodes
int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum
int32_t offlineThreshold; // tsOfflineThreshold
int32_t statusInterval; // tsStatusInterval
char arbitrator[TSDB_EP_LEN]; // tsArbitrator
char timezone[64]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct {
uint32_t version;
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN];
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfTotalVnodes; // from config file
uint16_t openVnodes;
uint16_t numOfCores;
float diskAvailable; // GB
uint8_t alternativeRole;
uint8_t reserve[15];
SClusterCfg clusterCfg;
SVnodeLoad load[];
} SDMStatusMsg;
typedef struct {
......
......@@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCanelRequest(void *pContext);
void rpcCancelRequest(void *pContext);
#ifdef __cplusplus
}
......
......@@ -58,6 +58,7 @@ void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
void vnodeBuildStatusMsg(void * param);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
......
......@@ -56,11 +56,11 @@ TAOS *shellInit(SShellArguments *args) {
if (args->is_use_passwd) {
if (args->password == NULL) args->password = getpass("Enter password: ");
} else {
args->password = tsDefaultPass;
args->password = TSDB_DEFAULT_PASS;
}
if (args->user == NULL) {
args->user = tsDefaultUser;
args->user = TSDB_DEFAULT_USER;
}
taos_init();
......@@ -276,6 +276,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
st = taosGetTimestampUs();
TAOS_RES* pSql = taos_query(con, command);
result = pSql; // set it into the global variable
if (taos_errno(pSql)) {
taos_error(pSql);
return;
......@@ -284,7 +286,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\n\n");
fflush(stdout);
result = NULL;
taos_free_result(pSql);
return;
}
......@@ -294,6 +297,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
int error_no = 0;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
if (numOfRows < 0) {
result = NULL;
taos_free_result(pSql);
return;
}
......@@ -315,7 +319,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
if (fname != NULL) {
wordfree(&full_path);
}
result = NULL;
taos_free_result(pSql);
}
......@@ -419,8 +424,8 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_
}
}
static int dumpResultToFile(const char* fname, TAOS_RES* result) {
TAOS_ROW row = taos_fetch_row(result);
static int dumpResultToFile(const char* fname, TAOS_RES* tres) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
}
......@@ -441,9 +446,9 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) {
wordfree(&full_path);
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
int precision = taos_result_precision(result);
int num_fields = taos_num_fields(tres);
TAOS_FIELD *fields = taos_fetch_fields(tres);
int precision = taos_result_precision(tres);
for (int col = 0; col < num_fields; col++) {
if (col > 0) {
......@@ -455,7 +460,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) {
int numOfRows = 0;
do {
int32_t* length = taos_fetch_lengths(result);
int32_t* length = taos_fetch_lengths(tres);
for (int i = 0; i < num_fields; i++) {
if (i > 0) {
fputc(',', fp);
......@@ -465,10 +470,13 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) {
fputc('\n', fp);
numOfRows++;
row = taos_fetch_row(result);
row = taos_fetch_row(tres);
} while( row != NULL);
result = NULL;
taos_free_result(tres);
fclose(fp);
return numOfRows;
}
......@@ -769,8 +777,7 @@ void write_history() {
void taos_error(TAOS_RES *tres) {
fprintf(stderr, "\nDB error: %s\n", taos_errstr(tres));
/* free local resouce: allocated memory/metric-meta refcnt */
result = NULL;
taos_free_result(tres);
}
......@@ -845,9 +852,9 @@ void shellGetGrantInfo(void *con) {
char sql[] = "show grants";
result = taos_query(con, sql);
TAOS_RES* tres = taos_query(con, sql);
int code = taos_errno(result);
int code = taos_errno(tres);
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_COM_OPS_NOT_SUPPORT) {
fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con));
......@@ -857,18 +864,18 @@ void shellGetGrantInfo(void *con) {
return;
}
int num_fields = taos_field_count(result);
int num_fields = taos_field_count(tres);
if (num_fields == 0) {
fprintf(stderr, "\nInvalid grant information.\n");
exit(0);
} else {
if (result == NULL) {
if (tres == NULL) {
fprintf(stderr, "\nGrant information is null.\n");
exit(0);
}
TAOS_FIELD *fields = taos_fetch_fields(result);
TAOS_ROW row = taos_fetch_row(result);
TAOS_FIELD *fields = taos_fetch_fields(tres);
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
fprintf(stderr, "\nFailed to get grant information from server. Abort.\n");
exit(0);
......@@ -888,8 +895,8 @@ void shellGetGrantInfo(void *con) {
fprintf(stdout, "Server is Enterprise %s Edition, version is %s and will expire at %s.\n", serverVersion, taos_get_server_info(con), expiretime);
}
taos_free_result(result);
result = NULL;
taos_free_result(tres);
}
fprintf(stdout, "\n");
......
......@@ -15,7 +15,6 @@
#include "os.h"
#include "shell.h"
#include "tsclient.h"
pthread_t pid;
......@@ -23,14 +22,6 @@ pthread_t pid;
void interruptHandler(int signum) {
#ifdef LINUX
taos_stop_query(result);
if (result != NULL) {
/*
* we need to free result in async model, in order to avoid free
* results while the master thread is waiting for server response.
*/
tscQueueAsyncFreeResult(result);
}
result = NULL;
#else
printf("\nReceive ctrl+c or other signal, quit shell.\n");
......
......@@ -306,7 +306,7 @@ double getCurrentTime();
void callBack(void *param, TAOS_RES *res, int code);
int main(int argc, char *argv[]) {
SDemoArguments arguments = {NULL, // host
SDemoArguments arguments = { NULL, // host
0, // port
"root", // user
"taosdata", // password
......
......@@ -34,6 +34,8 @@
#include "taosdef.h"
#include "tutil.h"
#include "tglobal.h"
#define COMMAND_SIZE 65536
#define DEFAULT_DUMP_FILE "taosdump.sql"
......@@ -293,7 +295,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = {options, parse_opt, args_doc, doc};
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = NULL;
char *lcommand = NULL;
char *buffer = NULL;
......@@ -324,7 +325,7 @@ void taosFreeDbInfos();
int main(int argc, char *argv[]) {
SDumpArguments arguments = {
// connection option
NULL, "root", "taosdata", 0,
NULL, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS, 0,
// output file
DEFAULT_DUMP_FILE, DEFAULT_DUMP_FILE, NULL,
// dump unit option
......@@ -463,10 +464,10 @@ int taosDumpOut(SDumpArguments *arguments) {
taosDumpCharset(fp);
sprintf(command, "show databases");
result = taos_query(taos, command);
TAOS_RES* result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(taos));
fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(result));
taos_free_result(result);
goto _exit_failure;
}
......@@ -502,7 +503,7 @@ int taosDumpOut(SDumpArguments *arguments) {
}
strncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes);
if (strcmp(arguments->user, "root") == 0) {
if (strcmp(arguments->user, TSDB_DEFAULT_USER) == 0) {
dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]));
dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]));
dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]);
......@@ -613,7 +614,7 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
fprintf(fp, "USE %s\n\n", dbInfo->name);
sprintf(command, "show tables");
result = taos_query(taos,command);
TAOS_RES* result = taos_query(taos,command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
......@@ -717,7 +718,7 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
sprintf(command, "select %s from %s limit 1", tableDes->cols[counter].field, tableDes->name);
result = taos_query(taos, command);
TAOS_RES* result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
......@@ -795,7 +796,7 @@ int taosGetTableDes(char *table, STableDef *tableDes) {
sprintf(command, "describe %s", table);
result = taos_query(taos, command);
TAOS_RES* result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
......@@ -875,7 +876,7 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
sprintf(command, "select tbname from %s", metric);
result = taos_query(taos, command);
TAOS_RES* result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
......@@ -928,7 +929,7 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) {
sprintf(command, "select * from %s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc", tbname, arguments->start_time,
arguments->end_time);
result = taos_query(taos, command);
TAOS_RES* result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, reason: %s\n", command, taos_errstr(result));
......@@ -1177,9 +1178,13 @@ int taosDumpIn(SDumpArguments *arguments) {
tcommand = command;
}
taosReplaceCtrlChar(tcommand);
if (taos_query(taos, tcommand) == NULL)
TAOS_RES* result = taos_query(taos, tcommand);
if (taos_errno(result) != 0){
fprintf(stderr, "linenu: %" PRId64 " failed to run command %s reason:%s \ncontinue...\n", linenu, command,
taos_errstr(taos));
taos_errstr(result));
taos_free_result(result);
}
pstr = command;
pstr[0] = '\0';
......@@ -1225,12 +1230,12 @@ int taosDumpIn(SDumpArguments *arguments) {
tcommand = command;
}
taosReplaceCtrlChar(tcommand);
result = taos_query(taos, tcommand);
TAOS_RES* result = taos_query(taos, tcommand);
int32_t code = taos_errno(result);
if (code != 0)
{
fprintf(stderr, "linenu:%" PRId64 " failed to run command %s reason: %s \ncontinue...\n", linenu, command,
taos_errstr(taos));
taos_errstr(result));
}
taos_free_result(result);
}
......
......@@ -122,7 +122,8 @@ typedef struct SVgObj {
int32_t lbDnodeId;
int32_t lbTime;
int8_t inUse;
int8_t reserved[13];
int8_t accessState;
int8_t reserved[12];
int8_t updateEnd[1];
int32_t refCount;
struct SVgObj *prev, *next;
......
......@@ -47,7 +47,7 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode);
void * mnodeGetDnode(int32_t dnodeId);
void * mnodeGetDnodeByEp(char *ep);
void mnodeUpdateDnode(SDnodeObj *pDnode);
int32_t mnodeDropDnode(SDnodeObj *pDnode);
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg);
extern int32_t tsAccessSquence;
......
......@@ -36,10 +36,10 @@ extern int32_t sdbDebugFlag;
#define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLPrint(...) { monitorSaveLog(0, __VA_ARGS__); mPrint(__VA_ARGS__) }
#define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR MND-SDB ", 255, __VA_ARGS__); }}
#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("MND-SDB ", sdbDebugFlag, __VA_ARGS__);}}
#define sdbPrint(...) { taosPrintLog("MND-SDB ", 255, __VA_ARGS__); }
#define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR SDB ", 255, __VA_ARGS__); }}
#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("WARN SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__);}}
#define sdbPrint(...) { taosPrintLog("SDB ", 255, __VA_ARGS__); }
#define sdbLError(...) { monitorSaveLog(2, __VA_ARGS__); sdbError(__VA_ARGS__) }
#define sdbLWarn(...) { monitorSaveLog(1, __VA_ARGS__); sdbWarn(__VA_ARGS__) }
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
struct SMnodeMsg;
typedef enum {
SDB_TABLE_DNODE = 0,
SDB_TABLE_MNODE = 1,
......@@ -48,8 +50,11 @@ typedef struct {
ESdbOper type;
void * table;
void * pObj;
int32_t rowSize;
void * rowData;
int32_t rowSize;
int32_t retCode; // for callback in sdb queue
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
struct SMnodeMsg *pMsg;
} SSdbOper;
typedef struct {
......
......@@ -28,7 +28,8 @@ void * mnodeGetNextUser(void *pIter, SUserObj **pUser);
void mnodeIncUserRef(SUserObj *pUser);
void mnodeDecUserRef(SUserObj *pUser);
SUserObj *mnodeGetUserFromConn(void *pConn);
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass);
char * mnodeGetUserFromMsg(void *pMnodeMsg);
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg);
void mnodeDropAllUsers(SAcctObj *pAcct);
#ifdef __cplusplus
......
......@@ -34,7 +34,8 @@ void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb);
void * mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup);
void mnodeUpdateVgroup(SVgObj *pVgroup);
void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *dnodeId, SVnodeLoad *pVload);
void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload);
void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t openVnodes);
int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg, SDbObj *pDb);
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle);
......
......@@ -25,6 +25,8 @@
#include "mnodeSdb.h"
#include "mnodeUser.h"
#include "tglobal.h"
void * tsAcctSdb = NULL;
static int32_t tsAcctUpdateSize;
static int32_t mnodeCreateRootAcct();
......@@ -39,6 +41,7 @@ static int32_t mnodeAcctActionDestroy(SSdbOper *pOper) {
static int32_t mnodeAcctActionInsert(SSdbOper *pOper) {
SAcctObj *pAcct = pOper->pObj;
memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo));
pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS;
pthread_mutex_init(&pAcct->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
......@@ -78,7 +81,9 @@ static int32_t mnodeAcctActionDecode(SSdbOper *pOper) {
}
static int32_t mnodeAcctActionRestored() {
if (dnodeIsFirstDeploy()) {
int32_t numOfRows = sdbGetNumOfRows(tsAcctSdb);
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mPrint("dnode first deploy, create root acct");
int32_t code = mnodeCreateRootAcct();
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create root account, reason:%s", tstrerror(code));
......@@ -171,8 +176,8 @@ static int32_t mnodeCreateRootAcct() {
SAcctObj *pAcct = malloc(sizeof(SAcctObj));
memset(pAcct, 0, sizeof(SAcctObj));
strcpy(pAcct->user, "root");
taosEncryptPass((uint8_t*)"taosdata", strlen("taosdata"), pAcct->pass);
strcpy(pAcct->user, TSDB_DEFAULT_USER);
taosEncryptPass((uint8_t *)TSDB_DEFAULT_PASS, strlen(TSDB_DEFAULT_PASS), pAcct->pass);
pAcct->cfg = (SAcctCfg){
.maxUsers = 10,
.maxDbs = 64,
......@@ -204,4 +209,4 @@ int32_t acctInit() { return TSDB_CODE_SUCCESS; }
void acctCleanUp() {}
int32_t acctCheck(void *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; }
#endif
\ No newline at end of file
#endif
......@@ -41,7 +41,7 @@
static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg);
static int32_t mnodeDropDb(SMnodeMsg *newMsg);
static int32_t mnodeSetDbDropping(SDbObj *pDb);
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
......@@ -311,7 +311,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->replications < 0) pCfg->replications = tsReplications;
}
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg) {
int32_t code = acctCheck(pAcct, ACCT_GRANT_DB);
if (code != 0) return code;
......@@ -364,12 +364,15 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
.table = tsDbSdb,
.pObj = pDb,
.rowSize = sizeof(SDbObj),
.pMsg = pMsg
};
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pDb);
code = TSDB_CODE_MND_SDB_ERROR;
} else {
mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
......@@ -475,7 +478,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, "root") == 0) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -487,7 +490,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
#endif
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, "root") == 0) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
......@@ -511,7 +514,7 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, "root") == 0) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -613,7 +616,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* name = mnodeGetDbStr(pDb->name);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, TSDB_DB_NAME_LEN - 1);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -625,7 +628,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, "root") == 0) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->numOfVgroups;
......@@ -635,7 +638,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
#endif
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, "root") == 0) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.replications;
......@@ -656,7 +659,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, "root") == 0) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxTables; // table num can be created should minus 1
......@@ -771,12 +774,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
} else if (!pMsg->pUser->writeAuth) {
code = TSDB_CODE_MND_NO_RIGHTS;
} else {
code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user);
} else {
mError("db:%s, failed to create, reason:%s", pCreate->db, tstrerror(code));
}
code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate, pMsg);
}
return code;
......@@ -893,7 +891,31 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return newCfg;
}
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) return code;
SDbObj *pDb = pMsg->pDb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) {
mnodeSendCreateVgroupMsg(pVgroup, NULL);
}
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
mTrace("db:%s, all vgroups is altered", pDb->name);
mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
balanceNotify();
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
SDbCfg newCfg = mnodeGetAlterDbOption(pDb, pAlter);
if (terrno != TSDB_CODE_SUCCESS) {
return terrno;
......@@ -904,38 +926,24 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return code;
}
int32_t oldReplica = pDb->cfg.replications;
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg;
pDb->cfgVersion++;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb
.pObj = pDb,
.pMsg = pMsg,
.cb = mnodeAlterDbCb
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR;
code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
mnodeSendCreateVgroupMsg(pVgroup, NULL);
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
if (oldReplica != pDb->cfg.replications) {
balanceNotify();
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
......@@ -948,28 +956,26 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_DB;
}
int32_t code = mnodeAlterDb(pMsg->pDb, pAlter);
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, invalid db option", pAlter->db);
return code;
}
mTrace("db:%s, all vgroups is altered", pMsg->pDb->name);
return TSDB_CODE_SUCCESS;
return mnodeAlterDb(pMsg->pDb, pAlter, pMsg);
}
static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SDbObj *pDb = pMsg->pDb;
mPrint("db:%s, drop db from sdb", pDb->name);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb
.pObj = pDb,
.pMsg = pMsg
};
int32_t code = sdbDeleteRow(&oper);
if (code != 0) {
code = TSDB_CODE_MND_SDB_ERROR;
if (code == TSDB_CODE_SUCCESS) {
mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
......
此差异已折叠。
......@@ -31,6 +31,8 @@
#include "mnodeShow.h"
#include "mnodeUser.h"
#include "tglobal.h"
static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0;
static SRpcIpSet tsMnodeIpSetForShell;
......@@ -333,7 +335,7 @@ static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->pAcct->user, "root") != 0) {
if (strcmp(pUser->pAcct->user, TSDB_DEFAULT_USER) != 0) {
mnodeDecUserRef(pUser);
return TSDB_CODE_MND_NO_RIGHTS;
}
......@@ -401,9 +403,9 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols]);
} else {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols] - VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols]);
}
mnodeDecDnodeRef(pDnode);
......@@ -411,7 +413,7 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* roles = mnodeGetMnodeRoleStr(pMnode->role);
STR_TO_VARSTR(pWrite, roles);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
此差异已折叠。
此差异已折叠。
......@@ -122,7 +122,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
pShow->type = pShowMsg->type;
pShow->payloadLen = htons(pShowMsg->payloadLen);
strcpy(pShow->db, pShowMsg->db);
tstrncpy(pShow->db, pShowMsg->db, TSDB_DB_NAME_LEN);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShow = mnodePutShowObj(pShow, showObjSize);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mwrite queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
mError("app:%p:%p, msg:%s content is null", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN;
}
......@@ -54,27 +54,31 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse);
mTrace("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
mTrace("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i],
htons(ipSet->port[i]));
}
return TSDB_CODE_RPC_REDIRECT;
}
if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mwrite queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
mError("app:%p:%p, msg:%s not processed", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
}
int32_t code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code));
mError("app:%p:%p, msg:%s not processed, reason:%s", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code));
return code;
}
if (!pMsg->pUser->writeAuth) {
mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
mError("app:%p:%p, msg:%s not processed, no write auth", pMsg->rpcMsg.ahandle, pMsg,
taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_NO_RIGHTS;
}
......
......@@ -16,15 +16,11 @@
#ifndef TDENGINE_GC_HANDLE_H
#define TDENGINE_GC_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define GC_ROOT_URL_POS 0
#define GC_ACTION_URL_POS 1
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_TOKEN_H
#define TDENGINE_HTTP_TOKEN_H
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_CONTEXT_H
#define TDENGINE_HTTP_CONTEXT_H
#include "httpInt.h"
bool httpInitContexts();
void httpCleanupContexts();
const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(int32_t fd);
bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(void * pContext);
void httpReleaseContext(HttpContext *pContext);
void httpCloseContextByServer(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
#endif
此差异已折叠。
此差异已折叠。
......@@ -97,4 +97,8 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len);
// quick
void httpJsonPairStatus(JsonBuf* buf, int code);
// http json printer
JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext);
void httpFreeJsonBuf(struct HttpContext* pContext);
#endif
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册