提交 f805fdea 编写于 作者: K Kaili Xu

Merge branch 'develop' into fix/TD-4324

sync with develop
......@@ -24,7 +24,7 @@ TDengine的安装非常简单,从下载到安装成功仅仅只要几秒钟。
## <a class="anchor" id="start"></a>轻松启动
安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。
安装成功后,用户可使用 `systemctl` 命令来启动 TDengine 的服务进程。
```bash
$ systemctl start taosd
......@@ -35,21 +35,22 @@ $ systemctl start taosd
$ systemctl status taosd
```
如果TDengine服务正常工作,那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。
如果 TDengine 服务正常工作,那么您可以通过 TDengine 的命令行程序 `taos` 来访问并体验 TDengine。
**注意:**
- systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo
- 为更好的获得产品反馈,改善产品,TDengine会采集基本的使用信息,但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0,就可将其关闭。
- TDengine采用FQDN(一般就是hostname)作为节点的ID,为保证正常运行,需要给运行taosd的服务器配置好hostname,在客户端应用运行的机器配置好DNS服务或hosts文件,保证FQDN能够解析。
- systemctl 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo 。
- 为更好的获得产品反馈,改善产品,TDengine 会采集基本的使用信息,但您可以修改系统配置文件 taos.cfg 里的配置参数 telemetryReporting, 将其设为 0,就可将其关闭。
- TDengine 采用 FQDN (一般就是 hostname )作为节点的 ID,为保证正常运行,需要给运行 taosd 的服务器配置好 hostname,在客户端应用运行的机器配置好 DNS 服务或 hosts 文件,保证 FQDN 能够解析。
- `systemctl stop taosd` 指令在执行后并不会马上停止 TDengine 服务,而是会等待系统中必要的落盘工作正常完成。在数据量很大的情况下,这可能会消耗较长时间。
* TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装,用`which systemctl`命令来检测系统中是否存在`systemd`包:
* TDengine 支持在使用 [`systemd`](https://en.wikipedia.org/wiki/Systemd) 做进程服务管理的 linux 系统上安装,用 `which systemctl` 命令来检测系统中是否存在 `systemd` 包:
```bash
$ which systemctl
```
如果系统中不支持systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。
如果系统中不支持 systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。
## <a class="anchor" id="console"></a>TDengine命令行程序
......
......@@ -129,7 +129,7 @@ taosd -C
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改)
- replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改)
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。
- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,3:同时打开缓存最近行和列功能,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL:
......
......@@ -26,7 +26,7 @@
| TSDB_CODE_COM_OUT_OF_MEMORY | 0 | 0x0102 | "Out of memory" | -2147483390 |
| TSDB_CODE_COM_INVALID_CFG_MSG | 0 | 0x0103 | "Invalid config message" | -2147483389 |
| TSDB_CODE_COM_FILE_CORRUPTED | 0 | 0x0104 | "Data file corrupted" | -2147483388 |
| TSDB_CODE_TSC_INVALID_SQL | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_OPERATION | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_QHANDLE | 0 | 0x0201 | "Invalid qhandle" | -2147483135 |
| TSDB_CODE_TSC_INVALID_TIME_STAMP | 0 | 0x0202 | "Invalid combination of client/service time" | -2147483134 |
| TSDB_CODE_TSC_INVALID_VALUE | 0 | 0x0203 | "Invalid value in client" | -2147483133 |
......
......@@ -76,7 +76,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
4) 一条SQL 语句的最大长度为65480个字符;
5) 数据库还有更多与存储相关的配置参数,请参见 [服务端配置](https://www.taosdata.com/cn/documentation/taos-sql#management) 章节。
5) 数据库还有更多与存储相关的配置参数,请参见 [服务端配置](https://www.taosdata.com/cn/documentation/administrator#config) 章节。
- **显示系统当前参数**
......@@ -126,7 +126,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
```mysql
ALTER DATABASE db_name CACHELAST 0;
```
CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11 版本开始支持,修改后需要重启服务器生效。)
CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11.0 版本开始支持。从 2.1.1.0 版本开始,修改此参数后无需重启服务器即可生效。)
**Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。
......@@ -399,7 +399,12 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM
INSERT INTO tb1_name (tb1_field1_name, ...) [USING stb1_name TAGS (tag_value1, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...
tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...;
```
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。
从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写:
```mysql
INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...;
```
注意:虽然两种写法都可以,但并不能在一条 SQL 语句中混用,否则会报语法错误。
**历史记录写入**:可使用IMPORT或者INSERT命令,IMPORT的语法,功能与INSERT完全一样。
......
......@@ -39,39 +39,29 @@ typedef struct SLocalDataSource {
} SLocalDataSource;
typedef struct SLocalMerger {
SLocalDataSource ** pLocalDataSrc;
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
int32_t numOfVnode;
SLoserTreeInfo * pLoserTree;
tFilePage * pResultBuf;
int32_t nResultBufSize;
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
bool orderPrjOnSTable; // projection query on stable
SLoserTreeInfo *pLoserTree;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor *pDesc;
tExtMemBuffer **pExtMemBuffer; // disk-based buffer
char *buf; // temp buffer
} SLocalMerger;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel* pFinalColModel; // colModel for final result
SColumnModel* pFFColModel;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
int32_t tscLocalReducerEnvCreate(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType);
......@@ -81,12 +71,10 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SLocalMerger **pMerger, int64_t id);
void tscDestroyLocalMerger(SSqlObj *pSql);
//int32_t tscDoLocalMerge(SSqlObj *pSql);
void tscDestroyLocalMerger(SLocalMerger* pLocalMerger);
#ifdef __cplusplus
}
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#include "tsched.h"
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
......@@ -36,6 +37,9 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
......@@ -59,7 +63,7 @@ typedef struct SJoinSupporter {
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupInfo; // group by info
SGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
......@@ -90,22 +94,14 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
......@@ -127,7 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
......@@ -136,13 +132,14 @@ bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscHasColumnFilter(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
......@@ -150,9 +147,9 @@ bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType, int16_t colId);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql);
int32_t tscSetTableFullName(SName* pName, SStrToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertData(char* sqlstr);
......@@ -171,36 +168,49 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy);
void tscExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
......@@ -222,14 +232,14 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
......@@ -243,12 +253,11 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
......@@ -279,7 +288,7 @@ void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlCmd* pCmd);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
......@@ -295,6 +304,11 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
......@@ -309,10 +323,12 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -42,12 +42,6 @@ extern "C" {
struct SSqlInfo;
struct SLocalMerger;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct STableComInfo {
......@@ -85,10 +79,10 @@ typedef struct STableMeta {
} STableMeta;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
......@@ -137,8 +131,8 @@ typedef struct SJoinNode {
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
......@@ -205,10 +199,11 @@ typedef struct SQueryInfo {
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // groupby tags info
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
......@@ -232,30 +227,50 @@ typedef struct SQueryInfo {
int32_t bufLen;
char* buf;
SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
} SQueryInfo;
typedef struct {
STableMeta *pTableMeta;
SVgroupsInfo *pVgroupInfo;
} STableMetaVgroupInfo;
typedef struct SInsertStatementParam {
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables; // number of tables in table name list
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not
STagData tagData; // NOTE: pTagData->data is used as a variant length array
char msg[512]; // error message
char *sql; // current sql statement position
uint32_t insertType; // insert data from [file|sql statement| bound statement]
} SInsertStatementParam;
// TODO extract sql parser supporter
typedef struct {
int command;
uint8_t msgType;
SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
uint32_t insertType; // TODO remove it
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32
int16_t numOfCols;
......@@ -264,25 +279,13 @@ typedef struct {
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
int32_t clauseIndex; // index of multiple subclause query
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
SHashObj *pTableMetaMap; // local buffer to keep the queried table meta, before validating the AST
SQueryInfo *pQueryInfo;
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int32_t resColumnId;
} SSqlCmd;
typedef struct SResRec {
......@@ -443,7 +446,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......@@ -489,7 +492,7 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
......@@ -505,7 +508,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SQueryInfo* pQueryInfo);
int16_t getNewResColId(SSqlCmd* pCmd);
#ifdef __cplusplus
}
......
......@@ -218,11 +218,19 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
* Method: closeStmt
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
#ifdef __cplusplus
}
#endif
......
......@@ -749,7 +749,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
}
jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
(*env)->ReleaseStringUTFChars(env, jname, name);
return JNI_SUCCESS;
}
......@@ -762,7 +761,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
......@@ -777,14 +776,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
}
len = (*env)->GetArrayLength(env, lengthList);
char *lengthArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
char *lengthArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
char *nullArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
char *nullArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
......@@ -799,22 +798,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
b->length = (int32_t*)lengthArray;
// set the length and is_null array
switch(dataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
int32_t bytes = tDataTypes[dataType].bytes;
for(int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
// do nothing
if (!IS_VAR_DATA_TYPE(dataType)) {
int32_t bytes = tDataTypes[dataType].bytes;
for (int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
}
......@@ -878,3 +865,74 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj,
jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char* curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for(int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t*) &lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind);
int32_t nTags = (int32_t) numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
......@@ -59,6 +59,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->curSql = pSql->sqlstr;
pCmd->resColumnId = TSDB_RES_COL_ID;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
......@@ -69,7 +70,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
}
......@@ -127,7 +128,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling;
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
......@@ -220,6 +222,17 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
return;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) {
......@@ -231,7 +244,8 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
* all available virtual nodes in current clause has been checked already, now try the
* next one in the following union subclause
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling; // todo refactor
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
......@@ -255,7 +269,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
tscBuildAndSendRequest(pSql, pQueryInfo1);
}
}
......@@ -317,26 +331,38 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
// update the pExpr info, colList info, number of table columns
// TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case.
if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = &(tscSqlExprGet(pQueryInfo, i)->base);
SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
// update the table uid
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) {
int32_t index = pExpr->colInfo.colIndex;
if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) ||
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < numOfCols || index >= (numOfCols + numOfTags)))) {
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) {
return pSql->retryReason;
}
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) {
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else { // do nothing for udc
}
}
}
......@@ -374,12 +400,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send the corresponding query", pSql->self);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -401,42 +427,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
if (pCmd->parseFinished) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send corresponding query", pSql->self);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
assert(pCmd->command != TSDB_SQL_INSERT);
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64" redo parse sql string and proceed", pSql->self);
pCmd->parseFinished = false;
tscResetSqlCmd(pCmd, true);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscBuildAndSendRequest(pSql, NULL);
} else { // in all other cases, simple retry
tscBuildAndSendRequest(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
tscDebug("0x%"PRIx64" continue parse sql after get table meta", pSql->self);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -446,8 +438,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
......@@ -457,59 +449,52 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
} else {
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
tscImportDataFromFile(pSql);
} else {
tscHandleMultivnodeInsert(pSql);
}
}
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again",
pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo1);
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
}
}
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo1);
}
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
} else { // stream computing
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNumOfExprs(pQueryInfo) == 0) {
tsParseSql(pSql, false);
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
// tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
_error:
......
......@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -154,14 +154,14 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
tstrncpy(f.name, "Field", sizeof(f.name));
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
......@@ -171,7 +171,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Type", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
-1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE;
......@@ -181,7 +181,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Length", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
-1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t);
......@@ -191,7 +191,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Note", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
-1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE;
......@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
......@@ -390,7 +390,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
......@@ -405,7 +405,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes;
......@@ -418,7 +418,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE;
......@@ -428,7 +428,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
......@@ -445,7 +445,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
......@@ -532,7 +532,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
}
buf[0] = 0;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
free(buf);
......@@ -553,7 +553,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -607,7 +607,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -634,7 +634,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -675,7 +675,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
......@@ -704,7 +704,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -730,7 +730,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
......@@ -757,7 +757,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName(pSql->pTscObj->db, db);
pthread_mutex_unlock(&pSql->pTscObj->mutex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -784,7 +784,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -807,7 +807,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -859,7 +859,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
......@@ -873,7 +873,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......@@ -928,7 +928,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SERV_STATUS) {
pRes->code = tscProcessServStatus(pSql);
} else {
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
pRes->code = TSDB_CODE_TSC_INVALID_OPERATION;
tscError("0x%"PRIx64" not support command:%d", pSql->self, pCmd->command);
}
......
此差异已折叠。
......@@ -107,7 +107,7 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
}
if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (timePrec == TSDB_TIME_PRECISION_MILLI) {
......@@ -441,7 +441,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int1
*str += index;
if (sToken.type == TK_QUESTION) {
if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
if (pCmd->insertParam.insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
return tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
}
......@@ -647,7 +647,7 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta,
pBlocks->sversion = pTableMeta->sversion;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;
return TSDB_CODE_SUCCESS;
......@@ -708,7 +708,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlock
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
char tmpTokenBuf[16*1024] = {0}; // used for deleting Escape character: \\, \', \"
int32_t numOfRows = 0;
......@@ -747,12 +747,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
const int32_t STABLE_INDEX = 1;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
char *sql = *sqlstr;
pSql->cmd.autoCreated = false;
// get the token of specified table
index = 0;
tableToken = tStrGetToken(sql, &index, false);
......@@ -786,7 +784,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
}
if (numOfColList == 0 && (*boundColumn) != NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
......@@ -802,7 +800,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
}
STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
code = tscSetTableFullName(pSTableMetaInfo, &sToken, pSql);
code = tscSetTableFullName(&pSTableMetaInfo->name, &sToken, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -879,7 +877,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (TK_ILLEGAL == sToken.type) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (sToken.n == 0 || sToken.type == TK_RP) {
......@@ -961,7 +959,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
}
if (numOfColsAfterTags == 0 && (*boundColumn) != NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
sToken = tStrGetToken(sql, &index, false);
......@@ -973,13 +971,13 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
}
int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
int32_t ret = tscSetTableFullName(&pTableMetaInfo->name, &tableToken, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
......@@ -991,7 +989,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
sql = sToken.z;
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
......@@ -1015,12 +1013,17 @@ int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
return tscValidateName(psTblToken);
}
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
static int32_t validateDataSource(SSqlCmd *pCmd, int32_t type, const char *sql) {
uint32_t *insertType = &pCmd->insertParam.insertType;
if (*insertType == TSDB_QUERY_TYPE_STMT_INSERT && type == TSDB_QUERY_TYPE_INSERT) {
return TSDB_CODE_SUCCESS;
}
if ((*insertType) != 0 && (*insertType) != type) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mixed up", sql);
}
pCmd->dataSourceType = type;
*insertType = type;
return TSDB_CODE_SUCCESS;
}
......@@ -1090,7 +1093,6 @@ static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SS
_clean:
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
......@@ -1106,7 +1108,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
......@@ -1120,9 +1122,9 @@ int tsParseInsertSql(SSqlObj *pSql) {
return code;
}
if (NULL == pCmd->pTableBlockHashList) {
pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (NULL == pCmd->pTableBlockHashList) {
if (NULL == pCmd->insertParam.pTableBlockHashList) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (NULL == pCmd->insertParam.pTableBlockHashList) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _clean;
}
......@@ -1130,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
str = pCmd->curSql;
}
tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->pTableBlockHashList);
tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->insertParam.pTableBlockHashList);
while (1) {
int32_t index = 0;
......@@ -1142,7 +1144,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
* if the data is from the data file, no data has been generated yet. So, there no data to
* merge or submit, save the file path and parse the file in other routines.
*/
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
goto _clean;
}
......@@ -1151,7 +1153,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
* Otherwise, create the first submit block and submit to virtual node.
*/
if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _clean;
} else {
break;
......@@ -1168,7 +1170,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
......@@ -1203,7 +1205,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
if (validateDataSource(pCmd, TSDB_QUERY_TYPE_FILE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
}
......@@ -1236,12 +1238,12 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (bindedColumns == NULL) {
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
if (validateDataSource(pCmd, TSDB_QUERY_TYPE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
}
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
int32_t ret = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
&dataBuf, NULL);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -1254,14 +1256,14 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
} else { // bindedColumns != NULL
// insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
if (validateDataSource(pCmd, TSDB_QUERY_TYPE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
}
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
int32_t ret = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
&dataBuf, NULL);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -1297,7 +1299,8 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean;
}
if ((pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
......@@ -1308,7 +1311,6 @@ int tsParseInsertSql(SSqlObj *pSql) {
_clean:
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
......@@ -1326,9 +1328,8 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd->count = 0;
pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
sToken = tStrGetToken(pSql->sqlstr, &index, false);
if (sToken.type != TK_INTO) {
......@@ -1343,11 +1344,11 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
if ((!pCmd->parseFinished) && (!initial)) {
if (!initial) {
tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->curSql);
}
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
......@@ -1357,31 +1358,32 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
return ret;
}
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql);
if ((sqlstr == NULL) || (pSql->parseRetry >= 1) ||
(ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
free(sqlstr);
} else {
assert(ret == TSDB_CODE_SUCCESS || ret == TSDB_CODE_TSC_ACTION_IN_PROGRESS || ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION);
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
ret = tsParseInsertSql(pSql);
}
}
} else {
SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
SSqlInfo sqlInfo = qSqlParse(pSql->sqlstr);
ret = tscValidateSqlInfo(pSql, &sqlInfo);
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
ret = tscValidateSqlInfo(pSql, &sqlInfo);
}
SqlInfoDestroy(&SQLInfo);
SqlInfoDestroy(&sqlInfo);
}
/*
......@@ -1398,8 +1400,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
SSqlCmd *pCmd = &pSql->cmd;
pSql->res.numOfRows = 0;
assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
......@@ -1411,7 +1412,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, 0);
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1461,17 +1462,17 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
// accumulate the total submit records
pParentSql->res.numOfRows += pSql->res.numOfRows;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
destroyTableNameList(pCmd);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pCmd->pTableBlockHashList == NULL) {
if (pCmd->insertParam.pTableBlockHashList == NULL) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pCmd->insertParam.pTableBlockHashList == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
......@@ -1479,7 +1480,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret =
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
if (ret != TSDB_CODE_SUCCESS) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1561,8 +1562,8 @@ void tscImportDataFromFile(SSqlObj *pSql) {
return;
}
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo[0];
assert(TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT) && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo;
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -373,11 +373,15 @@ int taos_num_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
int32_t num = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return num;
}
while(pQueryInfo->pDownstream != NULL) {
pQueryInfo = pQueryInfo->pDownstream;
}
size_t numOfCols = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfCols; ++i) {
SInternalField* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
......@@ -408,7 +412,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return NULL;
}
......@@ -560,7 +564,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return true;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
......@@ -614,7 +618,7 @@ int taos_errno(TAOS_RES *tres) {
* why the sql is invalid
*/
static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
if (code != TSDB_CODE_TSC_INVALID_SQL
if (code != TSDB_CODE_TSC_INVALID_OPERATION
&& code != TSDB_CODE_TSC_SQL_SYNTAX_ERROR) {
return false;
}
......@@ -673,7 +677,7 @@ char *taos_get_client_info() { return version; }
static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
......@@ -724,7 +728,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->rpcRid <= 0);
......@@ -754,7 +758,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return true;
}
......@@ -829,9 +833,9 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_NCHAR: {
int32_t charLen = varDataLen((char*)row[i] - VARSTR_HEADER_SIZE);
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
assert(charLen <= fields[i].bytes);
assert(charLen <= fields[i].bytes && charLen >= 0);
} else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE);
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
}
memcpy(str + len, row[i], charLen);
......@@ -868,15 +872,11 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
pCmd->resColumnId = TSDB_RES_COL_ID;
tscDebug("0x%"PRIx64" Valid SQL: %s pObj:%p", pSql->self, sql, pObj);
......@@ -896,10 +896,10 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql);
pCmd->curSql = NULL;
if (NULL != pCmd->pTableBlockHashList) {
taosHashCleanup(pCmd->pTableBlockHashList);
pCmd->pTableBlockHashList = NULL;
// pCmd->curSql = NULL;
if (NULL != pCmd->insertParam.pTableBlockHashList) {
taosHashCleanup(pCmd->insertParam.pTableBlockHashList);
pCmd->insertParam.pTableBlockHashList = NULL;
}
pSql->fp = asyncCallback;
......@@ -921,90 +921,19 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return code;
}
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object
tscResetSqlCmd(&pSql->cmd, false);
SSqlCmd *pCmd = &pSql->cmd;
pCmd->command = TSDB_SQL_MULTI_META;
pCmd->count = 0;
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
char *str = (char *)tblNameList;
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
if (pQueryInfo == NULL) {
pSql->res.code = terrno;
return terrno;
}
STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
return code;
}
char *nextStr;
char tblName[TSDB_TABLE_FNAME_LEN];
int payloadLen = 0;
char *pMsg = pCmd->payload;
while (1) {
nextStr = strchr(str, ',');
if (nextStr == NULL) {
break;
}
memcpy(tblName, str, nextStr - str);
int32_t len = (int32_t)(nextStr - str);
tblName[len] = '\0';
str = nextStr + 1;
len = (int32_t)strtrim(tblName);
SStrToken sToken = {.n = len, .type = TK_ID, .z = tblName};
tGetToken(tblName, &sToken.type);
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "table name is invalid");
return code;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
if (++pCmd->count > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
}
int32_t xlen = tNameLen(&pTableMetaInfo->name);
if (payloadLen + xlen + 128 >= pCmd->allocSize) {
char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + tblListLen);
if (pNewMem == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
sprintf(pCmd->payload, "failed to allocate memory");
return code;
}
pCmd->payload = pNewMem;
pCmd->allocSize = pCmd->allocSize + tblListLen;
pMsg = pCmd->payload;
}
char n[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, n);
payloadLen += sprintf(pMsg + payloadLen, "%s,", n);
void loadMultiTableMetaCallback(void *param, TAOS_RES *res, int code) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) {
return;
}
*(pMsg + payloadLen) = '\0';
pCmd->payloadLen = payloadLen + 1;
taosReleaseRef(tscObjRef, pSql->self);
pSql->res.code = code;
tsem_post(&pSql->rspSem);
}
return TSDB_CODE_SUCCESS;
static void freeElem(void* p) {
tfree(*(char**)p);
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
......@@ -1020,38 +949,28 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
pRes->code = 0;
pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfClauseTotal = 0;
assert(pSql->fp == NULL);
tscDebug("0x%"PRIx64" tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
pSql->fp = NULL; // todo set the correct callback function pointer
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, tblListLen, MAX_TABLE_NAME_LENGTH);
int32_t length = (int32_t)strlen(tableNameList);
if (length > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, length, MAX_TABLE_NAME_LENGTH);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
char *str = calloc(1, tblListLen + 1);
char *str = calloc(1, length + 1);
if (str == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscError("0x%"PRIx64" failed to allocate sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(str, tableNameList);
int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscBuildAndSendRequest()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qId = 0;
SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1059,12 +978,23 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code;
}
tscDoQuery(pSql);
registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
tscDebug("0x%"PRIx64" load multi-table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS;
}
taosArrayDestroyEx(plist, freeElem);
taosArrayDestroyEx(vgroupList, freeElem);
if (code != TSDB_CODE_SUCCESS) {
tscFreeRegisteredSqlObj(pSql);
return code;
}
tsem_wait(&pSql->rspSem);
tscFreeRegisteredSqlObj(pSql);
return code;
}
......@@ -37,7 +37,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
SExprInfo *pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
return false;
}
......@@ -89,12 +89,12 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -138,7 +138,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
if (pStream->isProject) {
......@@ -197,7 +197,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
......@@ -224,7 +224,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
......@@ -273,7 +273,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
......@@ -444,7 +444,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
......@@ -494,7 +494,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -556,7 +556,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -614,16 +614,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL;
}
pStream->stime = stime;
pStream->fp = fp;
pStream->stime = stime;
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStream->param = param;
pStream->pSql = pSql;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
......@@ -632,14 +632,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pSql->rspSem, 0, 0);
registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
......
......@@ -151,6 +151,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qId = 0;
pRes->numOfRows = 1;
pCmd->resColumnId = TSDB_RES_COL_ID;
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
......@@ -173,7 +174,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
if (pSql->cmd.command != TSDB_SQL_SELECT && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
line = __LINE__;
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto fail;
}
......@@ -266,7 +267,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
pSub->lastSyncTime = taosGetTimestampMs();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->id.uid, .key = 0};
......@@ -284,7 +285,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t numOfTables = taosArrayGetSize(tables);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
......@@ -304,7 +305,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1;
}
......@@ -503,8 +504,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlObj *pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
......
此差异已折叠。
此差异已折叠。
......@@ -234,6 +234,7 @@ typedef struct SDataCol {
int len; // column data length
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
void * pData; // Actual data pointer
TSKEY ts; // only used in last NULL column
} SDataCol;
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
......
......@@ -44,8 +44,8 @@ typedef struct SResPair {
// the structure for sql function in select clause
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
char token[TSDB_COL_NAME_LEN]; // original token
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
......@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
//SSchema tGetTbnameColumnSchema();
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
......
......@@ -2569,6 +2569,7 @@ _arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr) {
case TSDB_BINARY_OP_REMAINDER:
return vectorRemainder;
default:
assert(0);
return NULL;
}
}
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <texpr.h>
#include "os.h"
#include "texpr.h"
......@@ -465,27 +466,29 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
return expr;
}
tExprNode* exprdup(tExprNode* pTree) {
if (pTree == NULL) {
tExprNode* exprdup(tExprNode* pNode) {
if (pNode == NULL) {
return NULL;
}
tExprNode* pNode = calloc(1, sizeof(tExprNode));
if (pTree->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pTree->_node.pLeft);
tExprNode* pRight = exprdup(pTree->_node.pRight);
pNode->nodeType = TSQL_NODE_EXPR;
pNode->_node.pLeft = pLeft;
pNode->_node.pRight = pRight;
} else if (pTree->nodeType == TSQL_NODE_VALUE) {
pNode->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pNode->pVal, pTree->pVal);
} else if (pTree->nodeType == TSQL_NODE_COL) {
pNode->pSchema = calloc(1, sizeof(SSchema));
*pNode->pSchema = *pTree->pSchema;
tExprNode* pCloned = calloc(1, sizeof(tExprNode));
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pNode->_node.pLeft);
tExprNode* pRight = exprdup(pNode->_node.pRight);
pCloned->_node.pLeft = pLeft;
pCloned->_node.pRight = pRight;
pCloned->_node.optr = pNode->_node.optr;
pCloned->_node.hasPK = pNode->_node.hasPK;
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
pCloned->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pCloned->pVal, pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
pCloned->pSchema = calloc(1, sizeof(SSchema));
*pCloned->pSchema = *pNode->pSchema;
}
return pNode;
pCloned->nodeType = pNode->nodeType;
return pCloned;
}
......@@ -310,6 +310,16 @@ public class TSDBJNIConnector {
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
}
}
private native int setTableNameTagsImp(long stmt, String name, int numOfTags, byte[] tags, byte[] typeList, byte[] lengthList, byte[] nullList, long conn);
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
......
......@@ -86,6 +86,17 @@ static SStep tsDnodeSteps[] = {
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
};
static SStep tsDnodeCompactSteps[] = {
{"dnode-tfile", tfInit, tfCleanup},
{"dnode-storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnode-eps", dnodeInitEps, dnodeCleanupEps},
{"dnode-wal", walInit, walCleanUp},
{"dnode-mread", dnodeInitMRead, NULL},
{"dnode-mwrite", dnodeInitMWrite, NULL},
{"dnode-mpeer", dnodeInitMPeer, NULL},
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
};
static int dnodeCreateDir(const char *dir) {
if (mkdir(dir, 0755) != 0 && errno != EEXIST) {
return -1;
......@@ -95,13 +106,23 @@ static int dnodeCreateDir(const char *dir) {
}
static void dnodeCleanupComponents() {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeSteps, stepSize);
if (!tsCompactMnodeWal) {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeSteps, stepSize);
} else {
int32_t stepSize = sizeof(tsDnodeCompactSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeCompactSteps, stepSize);
}
}
static int32_t dnodeInitComponents() {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeSteps, stepSize);
if (!tsCompactMnodeWal) {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeSteps, stepSize);
} else {
int32_t stepSize = sizeof(tsDnodeCompactSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeCompactSteps, stepSize);
}
}
static int32_t dnodeInitTmr() {
......@@ -219,14 +240,20 @@ static int32_t dnodeInitStorage() {
if (tsCompactMnodeWal == 1) {
sprintf(tsMnodeTmpDir, "%s/mnode_tmp", tsDataDir);
tfsRmdir(tsMnodeTmpDir);
if (taosDirExist(tsMnodeTmpDir)) {
dError("mnode_tmp dir already exist in %s,quit compact job", tsMnodeTmpDir);
return -1;
}
if (dnodeCreateDir(tsMnodeTmpDir) < 0) {
dError("failed to create dir: %s, reason: %s", tsMnodeTmpDir, strerror(errno));
return -1;
}
sprintf(tsMnodeBakDir, "%s/mnode_bak", tsDataDir);
//tfsRmdir(tsMnodeBakDir);
if (taosDirExist(tsMnodeBakDir)) {
dError("mnode_bak dir already exist in %s,quit compact job", tsMnodeBakDir);
return -1;
}
}
//TODO(dengyihao): no need to init here
if (dnodeCreateDir(tsMnodeDir) < 0) {
......
......@@ -112,6 +112,7 @@ typedef struct TAOS_MULTI_BIND {
TAOS_STMT *taos_stmt_init(TAOS *taos);
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......
......@@ -33,6 +33,8 @@ extern "C" {
#endif
#define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX})
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow) {INT64_MAX, INT64_MIN})
#define TSKEY_INITIAL_VAL INT64_MIN
// Bytes for each type.
......@@ -298,7 +300,7 @@ do { \
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_FSYNC_PERIOD 0
......@@ -345,6 +347,7 @@ do { \
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
......
......@@ -74,7 +74,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A) //"Ref is not there")
//client
#define TSDB_CODE_TSC_INVALID_SQL TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid SQL statement")
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle")
#define TSDB_CODE_TSC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0202) //"Invalid combination of client/service time")
#define TSDB_CODE_TSC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x0203) //"Invalid value in client")
......
......@@ -84,7 +84,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "tables-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" )
......@@ -294,6 +294,8 @@ typedef struct {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
// if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table
int8_t supertable;
int8_t igNotExists;
} SCMDropTableMsg;
......@@ -703,8 +705,9 @@ typedef struct {
} STableInfoMsg;
typedef struct {
int32_t numOfVgroups;
int32_t numOfTables;
char tableIds[];
char tableNames[];
} SMultiTableInfoMsg;
typedef struct SSTableVgroupMsg {
......@@ -753,8 +756,9 @@ typedef struct STableMetaMsg {
typedef struct SMultiTableMeta {
int32_t numOfTables;
int32_t numOfVgroup;
int32_t contLen;
char metas[];
char meta[];
} SMultiTableMeta;
typedef struct {
......
......@@ -69,9 +69,13 @@ typedef struct {
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
} STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
typedef struct {
int64_t totalStorage; // total bytes occupie
......@@ -261,6 +265,12 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
SMemRef *pRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle);
/**
* get the queried table object list
* @param pHandle
......
此差异已折叠。
......@@ -121,7 +121,7 @@ int32_t mnodeStartSystem() {
int32_t mnodeInitSystem() {
mnodeInitTimer();
if (mnodeNeedStart()) {
if (mnodeNeedStart() || tsCompactMnodeWal) {
return mnodeStartSystem();
}
return 0;
......
......@@ -690,7 +690,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
pthread_mutex_unlock(&tsSdbMgmt.mutex);
// from app, row is created
if (pRow != NULL) {
if (pRow != NULL && tsCompactMnodeWal != 1) {
// forward to peers
pRow->processedCount = 0;
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
......@@ -713,7 +713,9 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
if (tsCompactMnodeWal != 1) {
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
}
// from wal or forward msg, row not created, should add into hash
if (action == SDB_ACTION_INSERT) {
......
此差异已折叠。
......@@ -21,6 +21,7 @@ extern "C" {
#endif
void taosRemoveDir(char *rootDir);
bool taosDirExist(const char* dirname);
int32_t taosMkDir(const char *pathname, mode_t mode);
void taosRemoveOldLogFiles(char *rootDir, int32_t keepDays);
int32_t taosRename(char *oldName, char *newName);
......
......@@ -45,6 +45,10 @@ void taosRemoveDir(char *rootDir) {
uInfo("dir:%s is removed", rootDir);
}
bool taosDirExist(const char* dirname) {
return access(dirname, F_OK) == 0;
}
int taosMkDir(const char *path, mode_t mode) {
int code = mkdir(path, 0755);
if (code < 0 && errno == EEXIST) code = 0;
......
......@@ -87,12 +87,12 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t daylight) {
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec);
} else {
return (*parseLocaltimeFp[daylight])(timestr, time, timePrec);
return (*parseLocaltimeFp[day_light])(timestr, time, timePrec);
}
}
......
......@@ -165,7 +165,7 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) {
}
}
httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_SQL & 0XFFFF, temp);
httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_OPERATION & 0XFFFF, temp);
}
void httpSendSuccResp(HttpContext *pContext, char *desc) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册