未验证 提交 5a06bb8a 编写于 作者: H hzcheng 提交者: GitHub

Merge branch 'master' into master

...@@ -6,10 +6,10 @@ We appreciate contributions from all developers. Feel free to follow us, fork th ...@@ -6,10 +6,10 @@ We appreciate contributions from all developers. Feel free to follow us, fork th
Any users can report bugs to us through the [github issue tracker](https://github.com/taosdata/TDengine/issues). We appreciate a detailed description of the problem you met. It is better to provide the detailed steps on reproducing the bug. Otherwise, an appendix with log files generated by the bug is welcome. Any users can report bugs to us through the [github issue tracker](https://github.com/taosdata/TDengine/issues). We appreciate a detailed description of the problem you met. It is better to provide the detailed steps on reproducing the bug. Otherwise, an appendix with log files generated by the bug is welcome.
## Sign the contributor license agreement ## Read the contributor license agreement
It is required to sign the Contributor Licence Agreement(CLA) before a user submitting your code patch. Follow the [TaosData CLA](https://www.taosdata.com/en/contributor/) link to access the agreement and instructions on how to sign it. It is required to agree the Contributor Licence Agreement(CLA) before a user submitting his/her code patch. Follow the [TaosData CLA](https://www.taosdata.com/en/contributor/) link to read through the agreement.
## Submit your code ## Submit your code
Before submitting your code, make sure to [sign the contributor license agreement](#sign-the-contributor-license-agreement) beforehand. Your submission should solve an issue or add a feature registered in the [github issue tracker](https://github.com/taosdata/TDengine/issues). If no corresponding issue or feature is found in the issue tracker, please create one. When submitting your code to our repository, please create a pull request with the issue number included. Before submitting your code, make sure to [read the contributor license agreement](#read-the-contributor-license-agreement) beforehand. If you don't accept the aggreement, please stop submitting. Your submission means you have accepted the agreement. Your submission should solve an issue or add a feature registered in the [github issue tracker](https://github.com/taosdata/TDengine/issues). If no corresponding issue or feature is found in the issue tracker, please create one. When submitting your code to our repository, please create a pull request with the issue number included.
...@@ -23,45 +23,30 @@ For user manual, system design and architecture, engineering blogs, refer to [TD ...@@ -23,45 +23,30 @@ For user manual, system design and architecture, engineering blogs, refer to [TD
# Building # Building
At the moment, TDengine only supports building and running on Linux systems. You can choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) or from the source code. This quick guide is for installation from the source only. At the moment, TDengine only supports building and running on Linux systems. You can choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) or from the source code. This quick guide is for installation from the source only.
To build TDengine, use [CMake](https://cmake.org/) 2.8 or higher versions in the project directory. To build TDengine, use [CMake](https://cmake.org/) 2.8 or higher versions in the project directory. Install CMake for example on Ubuntu:
Install CMake for example on Ubuntu:
``` ```
sudo apt-get install -y cmake build-essential sudo apt-get install -y cmake build-essential
``` ```
To compile and package the JDBC driver source code, you should have a Java jdk-8 or higher and Apache Maven 2.7 or higher installed.
To install openjdk-8 on Ubuntu:
```
sudo apt-get install openjdk-8-jdk
```
To install Apache Maven on Ubuntu:
```
sudo apt-get install maven
```
Build TDengine: Build TDengine:
```cmd ```cmd
mkdir build && cd build mkdir build && cd build
cmake .. && cmake --build . cmake .. && cmake --build .
``` ```
# Installing
After building successfully, TDengine can be installed by:
```cmd
make install
```
Users can find more information about directories installed on the system in the [directory and files](https://www.taosdata.com/en/documentation/administrator/#Directory-and-Files) section. It should be noted that installing from source code does not configure service management for TDengine.
Users can also choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) for it.
# Running # Quick Run
<!-- TDengine uses _/etc/taos/taos.cfg_ as the default configuration file. This behavior can be changed with _-c_ option. For a quick start, we will make directories structured as: To quickly start a TDengine server after building, run the command below in terminal:
```
test/
+--data/
|
+--log/
|
+--cfg/
|
+--taos.cfg
```
Then fill the configuration file _test/cfg/taos.cfg_:
```
echo -e "dataDir $(pwd)/test/data\nlogDir $(pwd)/test/log" > test/cfg/taos.cfg
​``` -->
To start the TDengine server, run the command below in terminal:
```cmd ```cmd
./build/bin/taosd -c test/cfg ./build/bin/taosd -c test/cfg
``` ```
...@@ -69,22 +54,30 @@ In another terminal, use the TDengine shell to connect the server: ...@@ -69,22 +54,30 @@ In another terminal, use the TDengine shell to connect the server:
``` ```
./build/bin/taos -c test/cfg ./build/bin/taos -c test/cfg
``` ```
option "-c test/cfg" specifies the system configuration file directory.
# Installing
After building successfully, TDengine can be installed by:
```cmd
make install
```
Users can find more information about directories installed on the system in the [directory and files](https://www.taosdata.com/en/documentation/administrator/#Directory-and-Files) section. It should be noted that installing from source code does not configure service management for TDengine.
Users can also choose to [install from packages](https://www.taosdata.com/en/getting-started/#Install-from-Package) for it.
Start the service in the terminal. To start the service after installation, in a terminal, use:
```cmd ```cmd
taosd taosd
``` ```
Then users can use the [TDengine shell](https://www.taosdata.com/en/getting-started/#TDengine-Shell) to connect the TDengine server. Then users can use the [TDengine shell](https://www.taosdata.com/en/getting-started/#TDengine-Shell) to connect the TDengine server. In a terminal, use:
```cmd ```cmd
taos taos
``` ```
If the terminal connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown. If TDengine shell connects the server successfully, welcome messages and version info are printed. Otherwise, an error message is shown.
# Try TDengine # Try TDengine
It is easy to run SQL commands in the terminal which is the same as other SQL databases. It is easy to run SQL commands from TDengine shell which is the same as other SQL databases.
```sql ```sql
create database db; create database db;
use db; use db;
......
...@@ -62,7 +62,7 @@ Time series data is a sequence of data points over time. Inside a table, the dat ...@@ -62,7 +62,7 @@ Time series data is a sequence of data points over time. Inside a table, the dat
To reduce the development complexity and improve data consistency, TDengine provides the pub/sub functionality. To publish a message, you simply insert a record into a table. Compared with popular messaging tool Kafka, you subscribe to a table or a SQL query statement, instead of a topic. Once new data points arrive, TDengine will notify the application. The process is just like Kafka. To reduce the development complexity and improve data consistency, TDengine provides the pub/sub functionality. To publish a message, you simply insert a record into a table. Compared with popular messaging tool Kafka, you subscribe to a table or a SQL query statement, instead of a topic. Once new data points arrive, TDengine will notify the application. The process is just like Kafka.
The detailed API will be introduced in the [connectors](https://www.taosdata.com/en/documentation/advanced-features/) section. The detailed API will be introduced in the [connectors](https://www.taosdata.com/en/documentation/connector/) section.
##Caching ##Caching
TDengine allocates a fixed-size buffer in memory, the newly arrived data will be written into the buffer first. Every device or table gets one or more memory blocks. For typical IoT scenarios, the hot data shall always be newly arrived data, they are more important for timely analysis. Based on this observation, TDengine manages the cache blocks in First-In-First-Out strategy. If no enough space in the buffer, the oldest data will be saved into hard disk first, then be overwritten by newly arrived data. TDengine also guarantees every device can keep at least one block of data in the buffer. TDengine allocates a fixed-size buffer in memory, the newly arrived data will be written into the buffer first. Every device or table gets one or more memory blocks. For typical IoT scenarios, the hot data shall always be newly arrived data, they are more important for timely analysis. Based on this observation, TDengine manages the cache blocks in First-In-First-Out strategy. If no enough space in the buffer, the oldest data will be saved into hard disk first, then be overwritten by newly arrived data. TDengine also guarantees every device can keep at least one block of data in the buffer.
...@@ -77,4 +77,4 @@ Through this design, caching tools like Redis are no longer needed in the system ...@@ -77,4 +77,4 @@ Through this design, caching tools like Redis are no longer needed in the system
TDengine creates one or more virtual nodes(vnode) in each data node. Each vnode contains data for multiple tables and has its own buffer. The buffer of a vnode is fully separated from the buffer of another vnode, not shared. But the tables in a vnode share the same buffer. TDengine creates one or more virtual nodes(vnode) in each data node. Each vnode contains data for multiple tables and has its own buffer. The buffer of a vnode is fully separated from the buffer of another vnode, not shared. But the tables in a vnode share the same buffer.
System configuration parameter cacheBlockSize configures the cache block size in bytes, and another parameter cacheNumOfBlocks configures the number of cache blocks. The total memory for the buffer of a vnode is `cacheBlockSize * cacheNumOfBlocks`​. Another system parameter `numOfBlocksPerMeter` configures the maximum number of cache blocks a table can use. When you create a database, you can specify these parameters. System configuration parameter cacheBlockSize configures the cache block size in bytes, and another parameter cacheNumOfBlocks configures the number of cache blocks. The total memory for the buffer of a vnode is `cacheBlockSize * cacheNumOfBlocks`​. Another system parameter `numOfBlocksPerMeter` configures the maximum number of cache blocks a table can use. When you create a database, you can specify these parameters.
\ No newline at end of file
...@@ -37,13 +37,13 @@ extern "C" { ...@@ -37,13 +37,13 @@ extern "C" {
struct SQLFunctionCtx; struct SQLFunctionCtx;
typedef struct SLocalDataSrc { typedef struct SLocalDataSource {
tExtMemBuffer *pMemBuffer; tExtMemBuffer *pMemBuffer;
int32_t flushoutIdx; int32_t flushoutIdx;
int32_t pageId; int32_t pageId;
int32_t rowIdx; int32_t rowIdx;
tFilePage filePage; tFilePage filePage;
} SLocalDataSrc; } SLocalDataSource;
enum { enum {
TSC_LOCALREDUCE_READY = 0x0, TSC_LOCALREDUCE_READY = 0x0,
...@@ -52,7 +52,7 @@ enum { ...@@ -52,7 +52,7 @@ enum {
}; };
typedef struct SLocalReducer { typedef struct SLocalReducer {
SLocalDataSrc **pLocalDataSrc; SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer; int32_t numOfBuffer;
int32_t numOfCompleted; int32_t numOfCompleted;
......
...@@ -41,21 +41,24 @@ typedef struct SParsedColElem { ...@@ -41,21 +41,24 @@ typedef struct SParsedColElem {
} SParsedColElem; } SParsedColElem;
typedef struct SParsedDataColInfo { typedef struct SParsedDataColInfo {
bool ordered; // denote if the timestamp in one data block ordered or not
int16_t numOfCols; int16_t numOfCols;
int16_t numOfAssignedCols; int16_t numOfAssignedCols;
SParsedColElem elems[TSDB_MAX_COLUMNS]; SParsedColElem elems[TSDB_MAX_COLUMNS];
bool hasVal[TSDB_MAX_COLUMNS]; bool hasVal[TSDB_MAX_COLUMNS];
int64_t prevTimestamp;
} SParsedDataColInfo; } SParsedDataColInfo;
SInsertedDataBlocks* tscCreateDataBlock(int32_t size); STableDataBlocks* tscCreateDataBlock(int32_t size);
void tscDestroyDataBlock(SInsertedDataBlocks** pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SDataBlockList* tscCreateBlockArrayList(); SDataBlockList* tscCreateBlockArrayList();
void tscDestroyBlockArrayList(SDataBlockList** pList); void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
void tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
...@@ -66,8 +69,7 @@ bool tscIsTwoStageMergeMetricQuery(SSqlObj* pSql); ...@@ -66,8 +69,7 @@ bool tscIsTwoStageMergeMetricQuery(SSqlObj* pSql);
/** /**
* *
* for the projection query on metric or point interpolation query on metric, * for the projection query on metric or point interpolation query on metric,
* we iterate all the meters, instead of invoke query on all qualified meters * we iterate all the meters, instead of invoke query on all qualified meters simultaneously.
* simultaneously.
* *
* @param pSql sql object * @param pSql sql object
* @return * @return
...@@ -124,8 +126,7 @@ void tscIncStreamExecutionCount(void* pStream); ...@@ -124,8 +126,7 @@ void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId); bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId);
// get starter position of metric query condition (query on tags) in // get starter position of metric query condition (query on tags) in SSqlCmd.payload
// SSqlCmd.payload
char* tsGetMetricQueryCondPos(STagCond* pCond); char* tsGetMetricQueryCondPos(STagCond* pCond);
void tscTagCondAssign(STagCond* pDst, STagCond* pSrc); void tscTagCondAssign(STagCond* pDst, STagCond* pSrc);
void tscTagCondRelease(STagCond* pCond); void tscTagCondRelease(STagCond* pCond);
...@@ -139,6 +140,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd); ...@@ -139,6 +140,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql); bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscDoQuery(SSqlObj* pSql); void tscDoQuery(SSqlObj* pSql);
void sortRemoveDuplicates(STableDataBlocks* dataBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -169,16 +169,22 @@ typedef struct STagCond { ...@@ -169,16 +169,22 @@ typedef struct STagCond {
char * pData; char * pData;
} STagCond; } STagCond;
typedef struct SInsertedDataBlocks { typedef struct STableDataBlocks {
char meterId[TSDB_METER_ID_LEN]; char meterId[TSDB_METER_ID_LEN];
int64_t size; int64_t vgid;
uint32_t nAllocSize; int64_t size;
uint32_t numOfMeters;
int64_t prevTS;
bool ordered;
int32_t numOfMeters;
int32_t rowSize;
uint32_t nAllocSize;
union { union {
char *filename; char *filename;
char *pData; char *pData;
}; };
} SInsertedDataBlocks; } STableDataBlocks;
typedef struct SDataBlockList { typedef struct SDataBlockList {
int32_t idx; int32_t idx;
...@@ -186,7 +192,7 @@ typedef struct SDataBlockList { ...@@ -186,7 +192,7 @@ typedef struct SDataBlockList {
int32_t nAlloc; int32_t nAlloc;
char * userParam; /* user assigned parameters for async query */ char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */ void * udfp; /* user defined function pointer, used in async model */
SInsertedDataBlocks **pData; STableDataBlocks **pData;
} SDataBlockList; } SDataBlockList;
typedef struct { typedef struct {
......
...@@ -410,7 +410,7 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) ...@@ -410,7 +410,7 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
tscTrace("%p Async insertion completed, destroy data block list", pSql); tscTrace("%p Async insertion completed, destroy data block list", pSql);
// release data block data // release data block data
tscDestroyBlockArrayList(&pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// all data has been sent to vnode, call user function // all data has been sent to vnode, call user function
(*pSql->fp)(pSql->param, tres, numOfRows); (*pSql->fp)(pSql->param, tres, numOfRows);
......
此差异已折叠。
...@@ -143,9 +143,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -143,9 +143,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
tscCleanSqlCmd(pCmd);
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
// transfer pInfo into select operation // transfer pInfo into select operation
switch (pInfo->sqlType) { switch (pInfo->sqlType) {
case DROP_TABLE: case DROP_TABLE:
...@@ -785,7 +782,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -785,7 +782,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// set sliding value // set sliding value
SSQLToken* pSliding = &pQuerySql->sliding; SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) { if (pSliding->n != 0) {
if (!tscEmbedded) { // pCmd->count == 1 means sql in stream function
if (!tscEmbedded && pCmd->count == 0) {
const char* msg = "not support sliding in query"; const char* msg = "not support sliding in query";
setErrMsg(pCmd, msg); setErrMsg(pCmd, msg);
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
......
...@@ -140,12 +140,10 @@ tSQLExpr *tSQLExprIdValueCreate(SSQLToken *pToken, int32_t optrType) { ...@@ -140,12 +140,10 @@ tSQLExpr *tSQLExprIdValueCreate(SSQLToken *pToken, int32_t optrType) {
nodePtr->val.nType = TSDB_DATA_TYPE_BIGINT; nodePtr->val.nType = TSDB_DATA_TYPE_BIGINT;
nodePtr->nSQLOptr = TK_TIMESTAMP; nodePtr->nSQLOptr = TK_TIMESTAMP;
} else { // must be field id if not numbers } else { // must be field id if not numbers
if (pToken != NULL) { assert(optrType == TK_ALL || optrType == TK_ID);
assert(optrType == TK_ID);
/* it must be the column name (tk_id) */ if (pToken != NULL) { // it must be the column name (tk_id)
nodePtr->colInfo = *pToken; nodePtr->colInfo = *pToken;
} else {
assert(optrType == TK_ALL);
} }
nodePtr->nSQLOptr = optrType; nodePtr->nSQLOptr = optrType;
......
此差异已折叠。
...@@ -287,14 +287,20 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -287,14 +287,20 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
pSql->thandle = NULL; pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (UTIL_METER_IS_METRIC(pCmd) && if (UTIL_METER_IS_METRIC(pCmd) && pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION) {
(pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID || pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION)) {
/* /*
* for metric query, in case of any meter missing during query, sub-query of metric query will failed, * for metric query, in case of any meter missing during query, sub-query of metric query will failed,
* causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app * causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app
*/ */
tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]); tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]);
code = TSDB_CODE_METRICMETA_EXPIRED; code = TSDB_CODE_METRICMETA_EXPIRED;
} else if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
/*
* session id is invalid(e.g., less than 0 or larger than maximum session per
* vnode) in submit/query msg, no retry
*/
code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) { } else if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL; code = TSDB_CODE_NETWORK_UNAVAIL;
} else if (pCmd->command == TSDB_SQL_HB) { } else if (pCmd->command == TSDB_SQL_HB) {
...@@ -358,14 +364,17 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -358,14 +364,17 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
} }
tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
/* /*
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) { if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp; pRes->numOfRows += *(int32_t *)pRes->pRsp;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
*(int32_t *)pRes->pRsp, pRes->rspLen);
} else {
tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen);
} }
} }
...@@ -421,7 +430,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -421,7 +430,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
return ahandle; return ahandle;
} }
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* pOld); static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* prevSqlObj);
static int tscLaunchMetricSubQueries(SSqlObj *pSql); static int tscLaunchMetricSubQueries(SSqlObj *pSql);
int tscProcessSql(SSqlObj *pSql) { int tscProcessSql(SSqlObj *pSql) {
...@@ -430,12 +439,6 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -430,12 +439,6 @@ int tscProcessSql(SSqlObj *pSql) {
tscTrace("%p SQL cmd:%d will be processed, name:%s", pSql, pSql->cmd.command, pSql->cmd.name); tscTrace("%p SQL cmd:%d will be processed, name:%s", pSql, pSql->cmd.command, pSql->cmd.name);
// whether don't judge 'isInsertFromFile' ?
if (pSql->cmd.command == TSDB_SQL_INSERT && pCmd->isInsertFromFile == 1) {
// pCmd->isInsertFromFile = 0; // lihui: can not clear the flag
return 0;
}
pSql->retry = 0; pSql->retry = 0;
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->maxRetry = 2; pSql->maxRetry = 2;
...@@ -595,7 +598,6 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { ...@@ -595,7 +598,6 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL); SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx); tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx);
tscProcessSql(pNew); tscProcessSql(pNew);
} }
...@@ -665,7 +667,6 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq ...@@ -665,7 +667,6 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d",
pPObj, pSql, idx, *trsupport->code); pPObj, pSql, idx, *trsupport->code);
} else { } else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && *(trsupport->code) == TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && *(trsupport->code) == TSDB_CODE_SUCCESS) {
/* /*
* current query failed, and the retry count is less than the available count, * current query failed, and the retry count is less than the available count,
...@@ -675,11 +676,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq ...@@ -675,11 +676,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
// clear local saved number of results // clear local saved number of results
trsupport->localBuffer->numOfElems = 0; trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex); pthread_mutex_unlock(&trsupport->queryMutex);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d, new SqlObj:%p", tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d, new SqlObj:%p",
trsupport->pParentSqlObj, pSql, numOfRows, idx, trsupport->numOfRetry, pNew); trsupport->pParentSqlObj, pSql, numOfRows, idx, trsupport->numOfRetry, pNew);
tscProcessSql(pNew); tscProcessSql(pNew);
return; return;
...@@ -689,7 +691,6 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq ...@@ -689,7 +691,6 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d",
pPObj, pSql, numOfRows, idx, *trsupport->code); pPObj, pSql, numOfRows, idx, *trsupport->code);
} }
} }
if (__sync_add_and_fetch_32(trsupport->numOfFinished, 1) < trsupport->numOfVnodes) { if (__sync_add_and_fetch_32(trsupport->numOfFinished, 1) < trsupport->numOfVnodes) {
...@@ -778,7 +779,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -778,7 +779,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d",
pPObj, pSql, pSvd->ip, pSvd->vnode, numOfRowsFromVnode, idx); pPObj, pSql, pSvd->ip, pSvd->vnode, numOfRowsFromVnode, idx);
tColModelCompress(pDesc->pSchema, trsupport->localBuffer, pDesc->pSchema->maxCapacity); tColModelCompact(pDesc->pSchema, trsupport->localBuffer, pDesc->pSchema->maxCapacity);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("%ld rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); printf("%ld rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
...@@ -877,7 +878,7 @@ void tscKillMetricQuery(SSqlObj *pSql) { ...@@ -877,7 +878,7 @@ void tscKillMetricQuery(SSqlObj *pSql) {
tscTrace("%p metric query is cancelled", pSql); tscTrace("%p metric query is cancelled", pSql);
} }
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* prevSqlObj) { SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* prevSqlObj) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlObj *pNew = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = (SSqlObj *)calloc(1, sizeof(SSqlObj));
...@@ -1032,8 +1033,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { ...@@ -1032,8 +1033,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.count); /* number of meters to be inserted */ pShellMsg->numOfSid = htonl(pSql->cmd.count); /* number of meters to be inserted */
pMsg += sizeof(SShellSubmitMsg);
/* /*
* pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here * pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
*/ */
...@@ -2264,8 +2263,6 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) { ...@@ -2264,8 +2263,6 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
SSqlGroupbyExpr *pGroupby = &pCmd->groupbyExpr; SSqlGroupbyExpr *pGroupby = &pCmd->groupbyExpr;
pMetaMsg->limit = htobe64(pCmd->glimit.limit);
pMetaMsg->offset = htobe64(pCmd->glimit.offset);
pMetaMsg->numOfTags = htons(pCmd->numOfReqTags); pMetaMsg->numOfTags = htons(pCmd->numOfReqTags);
pMetaMsg->numOfGroupbyCols = htons(pGroupby->numOfGroupbyCols); pMetaMsg->numOfGroupbyCols = htons(pGroupby->numOfGroupbyCols);
...@@ -2750,7 +2747,6 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId) { ...@@ -2750,7 +2747,6 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId) {
} else { } else {
pNew->fp = tscMeterMetaCallBack; pNew->fp = tscMeterMetaCallBack;
pNew->param = pSql; pNew->param = pSql;
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
code = tscProcessSql(pNew); code = tscProcessSql(pNew);
......
...@@ -72,7 +72,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi ...@@ -72,7 +72,7 @@ TAOS *taos_connect_imp(char *ip, char *user, char *pass, char *db, int port, voi
pObj->signature = pObj; pObj->signature = pObj;
strncpy(pObj->user, user, TSDB_USER_LEN); strncpy(pObj->user, user, TSDB_USER_LEN);
taosEncryptPass(pass, strlen(pass), pObj->pass); taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
pObj->mgmtPort = port ? port : tsMgmtShellPort; pObj->mgmtPort = port ? port : tsMgmtShellPort;
if (db) { if (db) {
......
...@@ -145,13 +145,14 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -145,13 +145,14 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql, int32_t numOfRows) { static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql, int32_t numOfRows) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int64_t timestamp = *(int64_t *)pRes->data; int64_t timestamp = *(int64_t *)pRes->data;
int64_t actualTimestamp = pStream->stime - pStream->interval;
if (timestamp != pStream->stime) { if (timestamp != actualTimestamp) {
// reset the timestamp of each agg point by using start time of each interval // reset the timestamp of each agg point by using start time of each interval
*((int64_t *)pRes->data) = pStream->stime - pStream->interval; *((int64_t *)pRes->data) = actualTimestamp;
tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, tscWarn("%p stream:%p, timestamp of points is:%lld, reset to %lld", pSql, pStream, timestamp, actualTimestamp);
pStream->stime - pStream->interval);
} }
} }
...@@ -397,7 +398,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -397,7 +398,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now if (stime == 0) { // no data in meter till now
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime, stime); tscWarn("%p stream:%p, last timestamp:0, reset to:%lld", pSql, pStream, stime);
} else { } else {
int64_t newStime = (stime / pStream->interval) * pStream->interval; int64_t newStime = (stime / pStream->interval) * pStream->interval;
if (newStime != stime) { if (newStime != stime) {
...@@ -435,13 +436,25 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { ...@@ -435,13 +436,25 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
} }
static void setErrorInfo(STscObj* pObj, int32_t code, char* info) {
if (pObj == NULL) {
return;
}
SSqlCmd* pCmd = &pObj->pSql->cmd;
pObj->pSql->res.code = code;
strncpy(pCmd->payload, info, pCmd->payloadLen);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL; if (pObj == NULL || pObj->signature != pObj) return NULL;
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { // todo set corect error msg if (pSql == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
return NULL; return NULL;
} }
...@@ -451,22 +464,31 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, ...@@ -451,22 +464,31 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); tscAllocPayloadWithSize(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); pSql->sqlstr = strdup(sqlstr);
if (pSql->sqlstr == NULL) { // todo set corect error msg if (pSql->sqlstr == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tfree(pSql); tfree(pSql);
return NULL; return NULL;
} }
strcpy(pSql->sqlstr, sqlstr);
sem_init(&pSql->rspSem, 0, 0); sem_init(&pSql->rspSem, 0, 0);
sem_init(&pSql->emptyRspSem, 0, 1); sem_init(&pSql->emptyRspSem, 0, 1);
SSqlInfo SQLInfo = {0}; SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr); tSQLParse(&SQLInfo, pSql->sqlstr);
tscCleanSqlCmd(&pSql->cmd);
tscAllocPayloadWithSize(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
//todo refactor later
pSql->cmd.count = 1;
pRes->code = tscToSQLCmd(pSql, &SQLInfo); pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pObj, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
...@@ -474,6 +496,8 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, ...@@ -474,6 +496,8 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param,
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) { if (pStream == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return NULL; return NULL;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <math.h> #include <math.h>
#include <time.h> #include <time.h>
#include "ihash.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcache.h" #include "tcache.h"
#include "tkey.h" #include "tkey.h"
...@@ -31,9 +32,10 @@ ...@@ -31,9 +32,10 @@
/* /*
* the detailed information regarding metric meta key is: * the detailed information regarding metric meta key is:
* fullmetername + '.' + querycond + '.' + [tagId1, tagId2,...] + '.' + group_orderType + '.' + limit + '.' + offset * fullmetername + '.' + querycond + '.' + [tagId1, tagId2,...] + '.' + group_orderType
*
* if querycond is null, its format is: * if querycond is null, its format is:
* fullmetername + '.' + '(nil)' + '.' + [tagId1, tagId2,...] + '.' + group_orderType + '.' + limit + '.' + offset * fullmetername + '.' + '(nil)' + '.' + [tagId1, tagId2,...] + '.' + group_orderType
*/ */
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr) { void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr) {
char* pTagCondStr = NULL; char* pTagCondStr = NULL;
...@@ -60,8 +62,7 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr) { ...@@ -60,8 +62,7 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr) {
pTagCondStr = strdup(tsGetMetricQueryCondPos(&pCmd->tagCond)); pTagCondStr = strdup(tsGetMetricQueryCondPos(&pCmd->tagCond));
} }
int32_t keyLen = sprintf(keyStr, "%s.%s.[%s].%d.%lld.%lld", pCmd->name, pTagCondStr, tagIdBuf, int32_t keyLen = sprintf(keyStr, "%s.%s.[%s].%d", pCmd->name, pTagCondStr, tagIdBuf, pCmd->groupbyExpr.orderType);
pCmd->groupbyExpr.orderType, pCmd->glimit.limit, pCmd->glimit.offset);
free(pTagCondStr); free(pTagCondStr);
assert(keyLen <= TSDB_MAX_TAGS_LEN); assert(keyLen <= TSDB_MAX_TAGS_LEN);
...@@ -142,8 +143,7 @@ bool tscProjectionQueryOnMetric(SSqlObj* pSql) { ...@@ -142,8 +143,7 @@ bool tscProjectionQueryOnMetric(SSqlObj* pSql) {
/* /*
* In following cases, return false for project query on metric * In following cases, return false for project query on metric
* 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4. * 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4. show query, instead of a select query
* show query, instead of a select query
*/ */
if (pCmd->pMeterMeta == NULL || !UTIL_METER_IS_METRIC(pCmd) || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || if (pCmd->pMeterMeta == NULL || !UTIL_METER_IS_METRIC(pCmd) || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->exprsInfo.numOfExprs == 0) { pCmd->exprsInfo.numOfExprs == 0) {
...@@ -252,7 +252,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -252,7 +252,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
} }
void tscfreeSqlCmdData(SSqlCmd* pCmd) { void tscfreeSqlCmdData(SSqlCmd* pCmd) {
tscDestroyBlockArrayList(&pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscTagCondRelease(&pCmd->tagCond); tscTagCondRelease(&pCmd->tagCond);
tscClearFieldInfo(pCmd); tscClearFieldInfo(pCmd);
...@@ -334,20 +334,22 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -334,20 +334,22 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql); free(pSql);
} }
SInsertedDataBlocks* tscCreateDataBlock(int32_t size) { STableDataBlocks* tscCreateDataBlock(int32_t size) {
SInsertedDataBlocks* dataBuf = (SInsertedDataBlocks*)calloc(1, sizeof(SInsertedDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
dataBuf->nAllocSize = (uint32_t) size; dataBuf->nAllocSize = (uint32_t)size;
dataBuf->pData = calloc(1, dataBuf->nAllocSize); dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
return dataBuf; return dataBuf;
} }
void tscDestroyDataBlock(SInsertedDataBlocks** pDataBlock) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
if (*pDataBlock == NULL) { if (pDataBlock == NULL) {
return; return;
} }
tfree((*pDataBlock)->pData); tfree(pDataBlock->pData);
tfree(*pDataBlock); tfree(pDataBlock);
} }
SDataBlockList* tscCreateBlockArrayList() { SDataBlockList* tscCreateBlockArrayList() {
...@@ -360,29 +362,31 @@ SDataBlockList* tscCreateBlockArrayList() { ...@@ -360,29 +362,31 @@ SDataBlockList* tscCreateBlockArrayList() {
return pDataBlockArrayList; return pDataBlockArrayList;
} }
void tscDestroyBlockArrayList(SDataBlockList** pList) { void* tscDestroyBlockArrayList(SDataBlockList* pList) {
if (*pList == NULL) { if (pList == NULL) {
return; return NULL;
} }
for (int32_t i = 0; i < (*pList)->nSize; i++) { for (int32_t i = 0; i < pList->nSize; i++) {
tscDestroyDataBlock(&(*pList)->pData[i]); tscDestroyDataBlock(pList->pData[i]);
} }
tfree((*pList)->pData); tfree(pList->pData);
tfree(*pList); tfree(pList);
return NULL;
} }
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock) { int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
pCmd->count = pDataBlock->numOfMeters; pCmd->count = pDataBlock->numOfMeters;
strcpy(pCmd->name, pDataBlock->meterId); strncpy(pCmd->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
tscAllocPayloadWithSize(pCmd, pDataBlock->nAllocSize); tscAllocPayloadWithSize(pCmd, pDataBlock->nAllocSize);
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
/* set the message length */ // set the message length
pCmd->payloadLen = pDataBlock->nAllocSize; pCmd->payloadLen = pDataBlock->nAllocSize;
return tscGetMeterMeta(pSql, pCmd->name); return tscGetMeterMeta(pSql, pCmd->name);
} }
...@@ -390,10 +394,89 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock ...@@ -390,10 +394,89 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, SInsertedDataBlocks* pDataBlock
void tscFreeUnusedDataBlocks(SDataBlockList* pList) { void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
/* release additional memory consumption */ /* release additional memory consumption */
for (int32_t i = 0; i < pList->nSize; ++i) { for (int32_t i = 0; i < pList->nSize; ++i) {
SInsertedDataBlocks* pDataBlock = pList->pData[i]; STableDataBlocks* pDataBlock = pList->pData[i];
pDataBlock->pData = realloc(pDataBlock->pData, (size_t) pDataBlock->size); pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
pDataBlock->nAllocSize = (uint32_t) pDataBlock->size; pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
}
}
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name) {
STableDataBlocks* dataBuf = tscCreateDataBlock(size);
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
return dataBuf;
}
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId) {
STableDataBlocks* dataBuf = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosGetIntHashData(pHashList, id);
if (t1 != NULL) {
dataBuf = *t1;
}
if (dataBuf == NULL) {
dataBuf = tscCreateDataBlockEx((size_t)size, rowSize, startOffset, tableId);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf);
tscAppendDataBlock(pDataBlockList, dataBuf);
}
return dataBuf;
}
void tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosInitIntHash(8, sizeof(void*), taosHashInt);
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
STableDataBlocks* dataBuf =
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->meterId);
int64_t destSize = dataBuf->size + pOneTableBlock->size;
if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
}
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) {
dataBuf->pData = tmp;
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else {
// to do handle error
}
}
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData;
sortRemoveDuplicates(pOneTableBlock);
tscTrace("%p meterId:%s, sid:%d, rows:%d, sversion:%d", pSql, pOneTableBlock->meterId, pBlocks->sid,
pBlocks->numOfRows, pBlocks->sversion);
pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
memcpy(dataBuf->pData + dataBuf->size, pOneTableBlock->pData, pOneTableBlock->size);
dataBuf->size += pOneTableBlock->size;
dataBuf->numOfMeters += 1;
} }
tscDestroyBlockArrayList(pTableDataBlockList);
// free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList;
tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosCleanUpIntHash(pVnodeDataBlockHashList);
} }
void tscCloseTscObj(STscObj* pObj) { void tscCloseTscObj(STscObj* pObj) {
...@@ -802,7 +885,7 @@ static int32_t validateQuoteToken(SSQLToken* pToken) { ...@@ -802,7 +885,7 @@ static int32_t validateQuoteToken(SSQLToken* pToken) {
if (pToken->type == TK_STRING) { if (pToken->type == TK_STRING) {
return tscValidateName(pToken); return tscValidateName(pToken);
} }
if (k != pToken->n || pToken->type != TK_ID) { if (k != pToken->n || pToken->type != TK_ID) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
...@@ -821,17 +904,20 @@ int32_t tscValidateName(SSQLToken* pToken) { ...@@ -821,17 +904,20 @@ int32_t tscValidateName(SSQLToken* pToken) {
pToken->n = strdequote(pToken->z); pToken->n = strdequote(pToken->z);
strtrim(pToken->z); strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z); pToken->n = (uint32_t)strlen(pToken->z);
int len = tSQLGetToken(pToken->z, &pToken->type);
if (len == pToken->n){ int len = tSQLGetToken(pToken->z, &pToken->type);
// single token, validate it
if (len == pToken->n) {
return validateQuoteToken(pToken); return validateQuoteToken(pToken);
} } else {
else { sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n);
sep = strnchrNoquote(pToken->z, TS_PATH_DELIMITER[0], pToken->n); if (sep == NULL) {
if (sep == NULL) { return TSDB_CODE_INVALID_SQL;
return TSDB_CODE_INVALID_SQL; }
}
return tscValidateName(pToken); return tscValidateName(pToken);
} }
} else { } else {
if (isNumber(pToken)) { if (isNumber(pToken)) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
...@@ -843,7 +929,7 @@ int32_t tscValidateName(SSQLToken* pToken) { ...@@ -843,7 +929,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
if (pToken->type == TK_SPACE) { if (pToken->type == TK_SPACE) {
strtrim(pToken->z); strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z); pToken->n = (uint32_t)strlen(pToken->z);
} }
pToken->n = tSQLGetToken(pToken->z, &pToken->type); pToken->n = tSQLGetToken(pToken->z, &pToken->type);
...@@ -965,8 +1051,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { ...@@ -965,8 +1051,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
SSqlObj* pHeatBeat = pObj->pHb; SSqlObj* pHeatBeat = pObj->pHb;
assert(pHeatBeat == pHeatBeat->signature); assert(pHeatBeat == pHeatBeat->signature);
pHeatBeat->cmd.type = 1; // to denote the heart-beat timer close connection pHeatBeat->cmd.type = 1; // to denote the heart-beat timer close connection and free all allocated resources
// and free all allocated resources
} }
bool tscShouldFreeHeatBeat(SSqlObj* pHb) { bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
...@@ -1052,7 +1137,6 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -1052,7 +1137,6 @@ void tscDoQuery(SSqlObj* pSql) {
if (pCmd->command > TSDB_SQL_LOCAL) { if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql); tscProcessLocalCmd(pSql);
} else { } else {
// add to sql list, so that the show queries could get the query info
if (pCmd->command == TSDB_SQL_SELECT) { if (pCmd->command == TSDB_SQL_SELECT) {
tscAddIntoSqlList(pSql); tscAddIntoSqlList(pSql);
} }
...@@ -1061,18 +1145,19 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -1061,18 +1145,19 @@ void tscDoQuery(SSqlObj* pSql) {
pSql->cmd.vnodeIdx += 1; pSql->cmd.vnodeIdx += 1;
} }
if (pSql->fp == NULL) { void* fp = pSql->fp;
if (0 == pCmd->isInsertFromFile) {
tscProcessSql(pSql); if (pCmd->isInsertFromFile == 1) {
tscProcessMultiVnodesInsert(pSql); // handle the multi-vnode insertion tscProcessMultiVnodesInsertForFile(pSql);
} else if (1 == pCmd->isInsertFromFile) {
tscProcessMultiVnodesInsertForFile(pSql);
} else {
assert(false);
}
} else { } else {
// pSql may be released in this function if it is a async insertion.
tscProcessSql(pSql); tscProcessSql(pSql);
}
// handle the multi-vnode insertion for sync model
if (fp == NULL) {
assert(pSql->signature == pSql);
tscProcessMultiVnodesInsert(pSql);
}
}
} }
} }
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>1.0.0</version> <version>1.0.1</version>
<name>JDBCDriver</name> <name>JDBCDriver</name>
<description>TDengine JDBC Driver</description> <description>TDengine JDBC Driver</description>
<properties> <properties>
......
...@@ -24,6 +24,8 @@ public abstract class TSDBConstants { ...@@ -24,6 +24,8 @@ public abstract class TSDBConstants {
public static final String INVALID_VARIABLES = "invalid variables"; public static final String INVALID_VARIABLES = "invalid variables";
public static Map<Integer, String> DATATYPE_MAP = null; public static Map<Integer, String> DATATYPE_MAP = null;
public static final long JNI_NULL_POINTER = 0L;
public static final int JNI_SUCCESS = 0; public static final int JNI_SUCCESS = 0;
public static final int JNI_TDENGINE_ERROR = -1; public static final int JNI_TDENGINE_ERROR = -1;
public static final int JNI_CONNECTION_NULL = -2; public static final int JNI_CONNECTION_NULL = -2;
......
...@@ -19,7 +19,6 @@ import java.sql.SQLWarning; ...@@ -19,7 +19,6 @@ import java.sql.SQLWarning;
import java.util.List; import java.util.List;
public class TSDBJNIConnector { public class TSDBJNIConnector {
static final long INVALID_CONNECTION_POINTER_VALUE = 0l;
static volatile Boolean isInitialized = false; static volatile Boolean isInitialized = false;
static { static {
...@@ -29,7 +28,12 @@ public class TSDBJNIConnector { ...@@ -29,7 +28,12 @@ public class TSDBJNIConnector {
/** /**
* Connection pointer used in C * Connection pointer used in C
*/ */
private long taos = INVALID_CONNECTION_POINTER_VALUE; private long taos = TSDBConstants.JNI_NULL_POINTER;
/**
* Result set pointer for the current connection
*/
private long taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
/** /**
* result set status in current connection * result set status in current connection
...@@ -41,7 +45,7 @@ public class TSDBJNIConnector { ...@@ -41,7 +45,7 @@ public class TSDBJNIConnector {
* Whether the connection is closed * Whether the connection is closed
*/ */
public boolean isClosed() { public boolean isClosed() {
return this.taos == INVALID_CONNECTION_POINTER_VALUE; return this.taos == TSDBConstants.JNI_NULL_POINTER;
} }
/** /**
...@@ -86,13 +90,13 @@ public class TSDBJNIConnector { ...@@ -86,13 +90,13 @@ public class TSDBJNIConnector {
* @throws SQLException * @throws SQLException
*/ */
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException { public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
if (this.taos != INVALID_CONNECTION_POINTER_VALUE) { if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
this.closeConnectionImp(this.taos); this.closeConnectionImp(this.taos);
this.taos = INVALID_CONNECTION_POINTER_VALUE; this.taos = TSDBConstants.JNI_NULL_POINTER;
} }
this.taos = this.connectImp(host, port, dbName, user, password); this.taos = this.connectImp(host, port, dbName, user, password);
if (this.taos == INVALID_CONNECTION_POINTER_VALUE) { if (this.taos == TSDBConstants.JNI_NULL_POINTER) {
throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg()), "", this.getErrCode()); throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg()), "", this.getErrCode());
} }
...@@ -108,13 +112,7 @@ public class TSDBJNIConnector { ...@@ -108,13 +112,7 @@ public class TSDBJNIConnector {
*/ */
public int executeQuery(String sql) throws SQLException { public int executeQuery(String sql) throws SQLException {
if (!this.isResultsetClosed) { if (!this.isResultsetClosed) {
//throw new RuntimeException(TSDBConstants.WrapErrMsg("Connection already has an open result set")); freeResultSet(taosResultSetPointer);
long resultSetPointer = this.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
//do nothing
} else {
this.freeResultSet(resultSetPointer);
}
} }
int code; int code;
...@@ -133,7 +131,14 @@ public class TSDBJNIConnector { ...@@ -133,7 +131,14 @@ public class TSDBJNIConnector {
throw new SQLException(TSDBConstants.FixErrMsg(code), "", this.getErrCode()); throw new SQLException(TSDBConstants.FixErrMsg(code), "", this.getErrCode());
} }
} }
// Try retrieving result set for the executed SQLusing the current connection pointer. If the executed
// SQL is a DML/DDL which doesn't return a result set, then taosResultSetPointer should be 0L. Otherwise,
// taosResultSetPointer should be a non-zero value.
taosResultSetPointer = this.getResultSetImp(this.taos);
if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
isResultsetClosed = false;
}
return code; return code;
} }
...@@ -162,8 +167,7 @@ public class TSDBJNIConnector { ...@@ -162,8 +167,7 @@ public class TSDBJNIConnector {
* Each connection should have a single open result set at a time * Each connection should have a single open result set at a time
*/ */
public long getResultSet() { public long getResultSet() {
long res = this.getResultSetImp(this.taos); return taosResultSetPointer;
return res;
} }
private native long getResultSetImp(long connection); private native long getResultSetImp(long connection);
...@@ -172,11 +176,31 @@ public class TSDBJNIConnector { ...@@ -172,11 +176,31 @@ public class TSDBJNIConnector {
* Free resultset operation from C to release resultset pointer by JNI * Free resultset operation from C to release resultset pointer by JNI
*/ */
public int freeResultSet(long result) { public int freeResultSet(long result) {
int res = this.freeResultSetImp(this.taos, result); int res = TSDBConstants.JNI_SUCCESS;
this.isResultsetClosed = true; // reset resultSetPointer to 0 after freeResultSetImp() return if (result != taosResultSetPointer && taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
return res; throw new RuntimeException("Invalid result set pointer");
} else if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER){
res = this.freeResultSetImp(this.taos, result);
isResultsetClosed = true; // reset resultSetPointer to 0 after freeResultSetImp() return
taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
}
return res;
} }
/**
* Close the open result set which is associated to the current connection. If the result set is already
* closed, return 0 for success.
* @return
*/
public int freeResultSet() {
int resCode = TSDBConstants.JNI_SUCCESS;
if (!isResultsetClosed) {
resCode = this.freeResultSetImp(this.taos, this.taosResultSetPointer);
taosResultSetPointer = TSDBConstants.JNI_NULL_POINTER;
}
return resCode;
}
private native int freeResultSetImp(long connection, long result); private native int freeResultSetImp(long connection, long result);
/** /**
...@@ -220,7 +244,7 @@ public class TSDBJNIConnector { ...@@ -220,7 +244,7 @@ public class TSDBJNIConnector {
if (code < 0) { if (code < 0) {
throw new SQLException(TSDBConstants.FixErrMsg(code), "", this.getErrCode()); throw new SQLException(TSDBConstants.FixErrMsg(code), "", this.getErrCode());
} else if (code == 0){ } else if (code == 0){
this.taos = INVALID_CONNECTION_POINTER_VALUE; this.taos = TSDBConstants.JNI_NULL_POINTER;
} else { } else {
throw new SQLException("Undefined error code returned by TDengine when closing a connection"); throw new SQLException("Undefined error code returned by TDengine when closing a connection");
} }
......
...@@ -244,7 +244,7 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -244,7 +244,7 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
@Override @Override
public boolean execute() throws SQLException { public boolean execute() throws SQLException {
return executeUpdate(getNativeSql()) == 0; return super.execute(getNativeSql());
} }
@Override @Override
......
...@@ -27,8 +27,14 @@ public class TSDBStatement implements Statement { ...@@ -27,8 +27,14 @@ public class TSDBStatement implements Statement {
/** Timeout for a query */ /** Timeout for a query */
protected int queryTimeout = 0; protected int queryTimeout = 0;
/**
* Status of current statement
*/
private boolean isClosed = true;
TSDBStatement(TSDBJNIConnector connecter) { TSDBStatement(TSDBJNIConnector connecter) {
this.connecter = connecter; this.connecter = connecter;
this.isClosed = false;
} }
public <T> T unwrap(Class<T> iface) throws SQLException { public <T> T unwrap(Class<T> iface) throws SQLException {
...@@ -40,13 +46,16 @@ public class TSDBStatement implements Statement { ...@@ -40,13 +46,16 @@ public class TSDBStatement implements Statement {
} }
public ResultSet executeQuery(String sql) throws SQLException { public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
this.connecter.executeQuery(sql); this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet(); long resultSetPointer = this.connecter.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) { if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == 0) { } else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
return null; return null;
} else { } else {
return new TSDBResultSet(this.connecter, resultSetPointer); return new TSDBResultSet(this.connecter, resultSetPointer);
...@@ -54,7 +63,20 @@ public class TSDBStatement implements Statement { ...@@ -54,7 +63,20 @@ public class TSDBStatement implements Statement {
} }
public int executeUpdate(String sql) throws SQLException { public int executeUpdate(String sql) throws SQLException {
return this.connecter.executeQuery(sql); if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
int res = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
this.connecter.freeResultSet();
throw new SQLException("The executed SQL is not a DML or a DDL");
} else {
return res;
}
} }
public String getErrorMsg() { public String getErrorMsg() {
...@@ -62,6 +84,12 @@ public class TSDBStatement implements Statement { ...@@ -62,6 +84,12 @@ public class TSDBStatement implements Statement {
} }
public void close() throws SQLException { public void close() throws SQLException {
if (!isClosed) {
if (!this.connecter.isResultsetClosed()) {
this.connecter.freeResultSet();
}
isClosed = true;
}
} }
public int getMaxFieldSize() throws SQLException { public int getMaxFieldSize() throws SQLException {
...@@ -110,19 +138,38 @@ public class TSDBStatement implements Statement { ...@@ -110,19 +138,38 @@ public class TSDBStatement implements Statement {
} }
public boolean execute(String sql) throws SQLException { public boolean execute(String sql) throws SQLException {
return executeUpdate(sql) == 0; if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
boolean res = true;
this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
// no result set is retrieved
res = false;
}
return res;
} }
public ResultSet getResultSet() throws SQLException { public ResultSet getResultSet() throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
long resultSetPointer = connecter.getResultSet(); long resultSetPointer = connecter.getResultSet();
TSDBResultSet resSet = null; TSDBResultSet resSet = null;
if (resultSetPointer != 0l) { if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
resSet = new TSDBResultSet(connecter, resultSetPointer); resSet = new TSDBResultSet(connecter, resultSetPointer);
} }
return resSet; return resSet;
} }
public int getUpdateCount() throws SQLException { public int getUpdateCount() throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
return this.connecter.getAffectedRows(); return this.connecter.getAffectedRows();
} }
...@@ -171,6 +218,9 @@ public class TSDBStatement implements Statement { ...@@ -171,6 +218,9 @@ public class TSDBStatement implements Statement {
} }
public int[] executeBatch() throws SQLException { public int[] executeBatch() throws SQLException {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
if (batchedArgs == null) { if (batchedArgs == null) {
throw new SQLException(TSDBConstants.WrapErrMsg("Batch is empty!")); throw new SQLException(TSDBConstants.WrapErrMsg("Batch is empty!"));
} else { } else {
...@@ -223,7 +273,7 @@ public class TSDBStatement implements Statement { ...@@ -223,7 +273,7 @@ public class TSDBStatement implements Statement {
} }
public boolean isClosed() throws SQLException { public boolean isClosed() throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); return isClosed;
} }
public void setPoolable(boolean poolable) throws SQLException { public void setPoolable(boolean poolable) throws SQLException {
......
...@@ -184,7 +184,7 @@ void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32 ...@@ -184,7 +184,7 @@ void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32
/* /*
* compress data into consecutive block without hole in data * compress data into consecutive block without hole in data
*/ */
void tColModelCompress(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity); void tColModelCompact(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelErase(tColModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e); void tColModelErase(tColModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);
......
...@@ -69,7 +69,7 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo *pInterpoInfo, int64_t *p ...@@ -69,7 +69,7 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo *pInterpoInfo, int64_t *p
* @param pInterpoInfo * @param pInterpoInfo
* @return * @return
*/ */
bool taosHasNoneInterpoPoints(SInterpolationInfo *pInterpoInfo); bool taosHasRemainsDataForInterpolation(SInterpolationInfo *pInterpoInfo);
int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo); int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo);
......
...@@ -626,6 +626,7 @@ void source_file(TAOS *con, char *fptr) { ...@@ -626,6 +626,7 @@ void source_file(TAOS *con, char *fptr) {
} }
while ((read_len = getline(&line, &line_len, f)) != -1) { while ((read_len = getline(&line, &line_len, f)) != -1) {
if (read_len >= MAX_COMMAND_SIZE) continue;
line[--read_len] = '\0'; line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with # if (read_len == 0 || isCommentLine(line)) { // line starts with #
......
...@@ -64,14 +64,12 @@ bool gcProcessLoginRequest(HttpContext* pContext) { ...@@ -64,14 +64,12 @@ bool gcProcessLoginRequest(HttpContext* pContext) {
//[{ //[{
// "refId": "A", // "refId": "A",
// "alias" : "taosd", // "alias" : "taosd",
// "sql" : "select first(taosd) from sys.mem where ts > now-6h and ts < now // "sql" : "select first(taosd) from sys.mem where ts > now-6h and ts < now interval(20000a)"
// interval(20000a)"
//}, //},
//{ //{
// "refId": "B", // "refId": "B",
// "alias" : "system", // "alias" : "system",
// "sql" : "select first(taosd) from sys.mem where ts > now-6h and ts < now // "sql" : "select first(taosd) from sys.mem where ts > now-6h and ts < now interval(20000a)"
// interval(20000a)"
//}] //}]
// output // output
//[{ //[{
......
...@@ -20,7 +20,8 @@ char* httpMsg[] = { ...@@ -20,7 +20,8 @@ char* httpMsg[] = {
"http method parse error", // 3 "http method parse error", // 3
"http version should be 1.0, 1.1 or 1.2", // 4 "http version should be 1.0, 1.1 or 1.2", // 4
"http head parse error", // 5 "http head parse error", // 5
"request size is too big", "http body size invalid", "request size is too big",
"http body size invalid",
"http chunked body parse error", // 8 "http chunked body parse error", // 8
"http url parse error", // 9 "http url parse error", // 9
"invalid type of Authorization", "invalid type of Authorization",
...@@ -52,7 +53,8 @@ char* httpMsg[] = { ...@@ -52,7 +53,8 @@ char* httpMsg[] = {
"tags not find", "tags not find",
"tags size is 0", "tags size is 0",
"tags size too long", // 36 "tags size too long", // 36
"tag is null", "tag name is null", "tag is null",
"tag name is null",
"tag name length too long", // 39 "tag name length too long", // 39
"tag value type should be number or string", "tag value type should be number or string",
"tag value is null", "tag value is null",
......
...@@ -69,11 +69,12 @@ enum _sync_cmd { ...@@ -69,11 +69,12 @@ enum _sync_cmd {
}; };
enum _meter_state { enum _meter_state {
TSDB_METER_STATE_READY, TSDB_METER_STATE_READY = 0x00,
TSDB_METER_STATE_IMPORTING, TSDB_METER_STATE_INSERT = 0x01,
TSDB_METER_STATE_UPDATING, TSDB_METER_STATE_IMPORTING = 0x02,
TSDB_METER_STATE_DELETING, TSDB_METER_STATE_UPDATING = 0x04,
TSDB_METER_STATE_DELETED, TSDB_METER_STATE_DELETING = 0x10,
TSDB_METER_STATE_DELETED = 0x18,
}; };
typedef struct { typedef struct {
...@@ -184,10 +185,10 @@ typedef struct _meter_obj { ...@@ -184,10 +185,10 @@ typedef struct _meter_obj {
short sqlLen; short sqlLen;
char searchAlgorithm : 4; char searchAlgorithm : 4;
char compAlgorithm : 4; char compAlgorithm : 4;
char state : 5; // deleted or added, 1: added char status; // 0: ok, 1: stop stream computing
char status : 3; // 0: ok, 1: stop stream computing
char reserved[16]; char reserved[16];
int state;
int numOfQueries; int numOfQueries;
char * pSql; char * pSql;
void * pStream; void * pStream;
...@@ -418,10 +419,6 @@ void vnodeCommitOver(SVnodeObj *pVnode); ...@@ -418,10 +419,6 @@ void vnodeCommitOver(SVnodeObj *pVnode);
TSKEY vnodeGetFirstKey(int vnode); TSKEY vnodeGetFirstKey(int vnode);
int vnodeSyncRetrieveCache(int vnode, int fd);
int vnodeSyncRestoreCache(int vnode, int fd);
pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode); pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode);
void vnodeCancelCommit(SVnodeObj *pVnode); void vnodeCancelCommit(SVnodeObj *pVnode);
...@@ -447,10 +444,6 @@ void *vnodeCommitToFile(void *param); ...@@ -447,10 +444,6 @@ void *vnodeCommitToFile(void *param);
void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid); void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid);
int vnodeSyncRetrieveFile(int vnode, int fd, uint32_t fileId, uint64_t *fmagic);
int vnodeSyncRestoreFile(int vnode, int sfd);
int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead); int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead);
int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery); int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery);
...@@ -476,14 +469,8 @@ void *vnodeGetMeterPeerConnection(SMeterObj *pObj, int index); ...@@ -476,14 +469,8 @@ void *vnodeGetMeterPeerConnection(SMeterObj *pObj, int index);
int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion); int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion);
void vnodeCloseAllSyncFds(int vnode);
void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]); void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]);
void vnodeStartSyncProcess(SVnodeObj *pVnode);
void vnodeCancelSync(int vnode);
void vnodeListPeerStatus(char *buffer); void vnodeListPeerStatus(char *buffer);
void vnodeCheckOwnStatus(SVnodeObj *pVnode); void vnodeCheckOwnStatus(SVnodeObj *pVnode);
...@@ -499,7 +486,7 @@ int vnodeInitStore(); ...@@ -499,7 +486,7 @@ int vnodeInitStore();
void vnodeCleanUpVnodes(); void vnodeCleanUpVnodes();
void vnodeRemoveVnode(int vnode); int vnodeRemoveVnode(int vnode);
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc); int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc);
......
...@@ -75,6 +75,12 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid ...@@ -75,6 +75,12 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid
void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc); void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc);
int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -113,7 +113,7 @@ int vnodeProcessCreateMeterRequest(char *pMsg) { ...@@ -113,7 +113,7 @@ int vnodeProcessCreateMeterRequest(char *pMsg) {
pVnode = vnodeList + vid; pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0) { if (pVnode->cfg.maxSessions <= 0) {
dError("vid:%d, not activated", vid); dError("vid:%d, not activated", vid);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _over; goto _over;
} }
...@@ -215,7 +215,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg) { ...@@ -215,7 +215,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg) {
if (pVnode->pCachePool == NULL) { if (pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pCreate->vnode); dError("vid:%d is not activated yet", pCreate->vnode);
vnodeSendVpeerCfgMsg(pCreate->vnode); vnodeSendVpeerCfgMsg(pCreate->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _create_over; goto _create_over;
} }
...@@ -445,7 +445,8 @@ int vnodeProcessFreeVnodeRequest(char *pMsg) { ...@@ -445,7 +445,8 @@ int vnodeProcessFreeVnodeRequest(char *pMsg) {
} }
dTrace("vid:%d receive free vnode message", pFree->vnode); dTrace("vid:%d receive free vnode message", pFree->vnode);
vnodeRemoveVnode(pFree->vnode); int32_t code = vnodeRemoveVnode(pFree->vnode);
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
pStart = (char *)malloc(128); pStart = (char *)malloc(128);
if (pStart == NULL) return 0; if (pStart == NULL) return 0;
...@@ -453,7 +454,7 @@ int vnodeProcessFreeVnodeRequest(char *pMsg) { ...@@ -453,7 +454,7 @@ int vnodeProcessFreeVnodeRequest(char *pMsg) {
*pStart = TSDB_MSG_TYPE_FREE_VNODE_RSP; *pStart = TSDB_MSG_TYPE_FREE_VNODE_RSP;
pMsg = pStart + 1; pMsg = pStart + 1;
*pMsg = 0; *pMsg = code;
vnodeSendMsgToMgmt(pStart); vnodeSendMsgToMgmt(pStart);
return 0; return 0;
......
...@@ -44,7 +44,7 @@ void dnodeInitModules() { ...@@ -44,7 +44,7 @@ void dnodeInitModules() {
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem; tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem; tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem; tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = tsEnableHttpModule ? -1 : 0; tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0; tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0; tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
...@@ -53,7 +53,7 @@ void dnodeInitModules() { ...@@ -53,7 +53,7 @@ void dnodeInitModules() {
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem; tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem; tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem; tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = tsEnableMonitorModule ? -1 : 0; tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0; tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0; tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
} }
......
...@@ -1140,54 +1140,13 @@ static void mgmtReorganizeMetersInMetricMeta(STabObj *pMetric, SMetricMetaMsg *p ...@@ -1140,54 +1140,13 @@ static void mgmtReorganizeMetersInMetricMeta(STabObj *pMetric, SMetricMetaMsg *p
startPos[1] = (int32_t)pRes->num; startPos[1] = (int32_t)pRes->num;
} }
/* if pInfo->limit == 0, the query will be intercepted by sdk, and wont be
* sent to mnode */
assert(pInfo->limit == -1 || pInfo->limit > 0);
int32_t numOfTotal = 0;
if (pInfo->offset >= numOfSubset) {
numOfTotal = 0;
} else if (numOfSubset == 1) {
// no 'groupBy' clause, all tables returned
numOfTotal = pRes->num;
} else {
/* there is a offset value of group */
int32_t start = 0;
int32_t end = 0;
if (pInfo->orderType == TSQL_SO_ASC) {
start = startPos[pInfo->offset];
if (pInfo->limit + pInfo->offset >= numOfSubset || pInfo->limit == -1) {
/* all results are required */
end = startPos[numOfSubset];
} else {
end = startPos[pInfo->limit + pInfo->offset];
}
} else {
end = startPos[numOfSubset - pInfo->offset];
if (pInfo->limit + pInfo->offset >= numOfSubset || pInfo->limit == -1) {
start = startPos[0];
} else {
start = startPos[numOfSubset - pInfo->limit - pInfo->offset];
}
}
numOfTotal = end - start;
assert(numOfTotal > 0);
memmove(pRes->pRes, pRes->pRes + start, numOfTotal * POINTER_BYTES);
}
/* /*
* sort the result according to vgid to ensure meters with the same vgid is * sort the result according to vgid to ensure meters with the same vgid is
* continuous in the result list * continuous in the result list
*/ */
__compar_fn_t functor = (pRes->nodeType == TAST_NODE_TYPE_METER_PTR) ? tabObjVGIDComparator : nodeVGIDComparator; __compar_fn_t functor = (pRes->nodeType == TAST_NODE_TYPE_METER_PTR) ? tabObjVGIDComparator : nodeVGIDComparator;
qsort(pRes->pRes, numOfTotal, POINTER_BYTES, functor); qsort(pRes->pRes, (size_t) pRes->num, POINTER_BYTES, functor);
pRes->num = numOfTotal;
free(descriptor->pTagSchema); free(descriptor->pTagSchema);
free(descriptor); free(descriptor);
free(startPos); free(startPos);
......
...@@ -340,19 +340,33 @@ void vnodeCommitOver(SVnodeObj *pVnode) { ...@@ -340,19 +340,33 @@ void vnodeCommitOver(SVnodeObj *pVnode) {
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
} }
void vnodeCancelCommit(SVnodeObj *pVnode) { static void vnodeWaitForCommitComplete(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
if (pPool == NULL) return;
pthread_mutex_lock(&pPool->vmutex); // wait for 100s at most
const int32_t totalCount = 1000;
int32_t count = 0;
if (pPool->commitInProcess) { // all meter is marked as dropped, so the commit will abort very quickly
pPool->commitInProcess = 0; while(count++ < totalCount) {
pthread_cancel(pVnode->commitThread); int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
commitInProcess = pPool->commitInProcess;
pthread_mutex_unlock(&pPool->vmutex);
if (commitInProcess) {
dWarn("vid:%d still in commit, wait for completed", pVnode->vnode);
taosMsleep(10);
}
} }
}
pthread_mutex_unlock(&pPool->vmutex); void vnodeCancelCommit(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
if (pPool == NULL) return;
vnodeWaitForCommitComplete(pVnode);
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer); taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
} }
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeUtil.h"
typedef struct { typedef struct {
int sversion; int sversion;
...@@ -160,13 +161,17 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) { ...@@ -160,13 +161,17 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
if (*(int *)(cont+head.contLen) != simpleCheck) break; if (*(int *)(cont+head.contLen) != simpleCheck) break;
SMeterObj *pObj = pVnode->meterList[head.sid]; SMeterObj *pObj = pVnode->meterList[head.sid];
if (pObj == NULL) { if (pObj == NULL) {
dError( dError("vid:%d, sid:%d not exists, ignore data in commit log, contLen:%d action:%d",
"vid:%d, sid:%d not exists, ignore data in commit log, "
"contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action); vnode, head.sid, head.contLen, head.action);
continue; continue;
} }
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, ignore data in commit log, contLen:%d action:%d",
vnode, head.sid, head.contLen, head.action);
continue;
}
int32_t numOfPoints = 0; int32_t numOfPoints = 0;
(*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion, (*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion,
&numOfPoints); &numOfPoints);
......
...@@ -408,7 +408,6 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) { ...@@ -408,7 +408,6 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) {
char dpath[TSDB_FILENAME_LEN] = "\0"; char dpath[TSDB_FILENAME_LEN] = "\0";
int fileId; int fileId;
int ret; int ret;
int file_removed = 0;
close(pVnode->nfd); close(pVnode->nfd);
pVnode->nfd = 0; pVnode->nfd = 0;
...@@ -449,14 +448,15 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) { ...@@ -449,14 +448,15 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) {
dTrace("vid:%d, %s and %s is saved", pVnode->vnode, pVnode->cfn, pVnode->lfn); dTrace("vid:%d, %s and %s is saved", pVnode->vnode, pVnode->cfn, pVnode->lfn);
if (pVnode->numOfFiles > pVnode->maxFiles) { // Retention policy here
fileId = pVnode->fileId - pVnode->numOfFiles + 1; fileId = pVnode->fileId - pVnode->numOfFiles + 1;
int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
while (fileId <= cfile - pVnode->maxFiles) {
vnodeRemoveFile(pVnode->vnode, fileId); vnodeRemoveFile(pVnode->vnode, fileId);
pVnode->numOfFiles--; pVnode->numOfFiles--;
file_removed = 1; fileId++;
} }
if (!file_removed) vnodeUpdateFileMagic(pVnode->vnode, pVnode->commitFileId);
vnodeSaveAllMeterObjToFile(pVnode->vnode); vnodeSaveAllMeterObjToFile(pVnode->vnode);
return; return;
...@@ -577,8 +577,20 @@ _again: ...@@ -577,8 +577,20 @@ _again:
// read compInfo // read compInfo
for (sid = 0; sid < pCfg->maxSessions; ++sid) { for (sid = 0; sid < pCfg->maxSessions; ++sid) {
if (pVnode->meterList == NULL) { // vnode is being freed, abort
goto _over;
}
pObj = (SMeterObj *)(pVnode->meterList[sid]); pObj = (SMeterObj *)(pVnode->meterList[sid]);
if (pObj == NULL) continue; if (pObj == NULL) {
continue;
}
// meter is going to be deleted, abort
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d is dropped, ignore this meter", vnode, sid);
continue;
}
pMeter = meterInfo + sid; pMeter = meterInfo + sid;
pHeader = ((SCompHeader *)tmem) + sid; pHeader = ((SCompHeader *)tmem) + sid;
...@@ -672,8 +684,9 @@ _again: ...@@ -672,8 +684,9 @@ _again:
pointsReadLast = pMeter->lastBlock.numOfPoints; pointsReadLast = pMeter->lastBlock.numOfPoints;
query.over = 0; query.over = 0;
headInfo.totalStorage -= (pointsReadLast * pObj->bytesPerPoint); headInfo.totalStorage -= (pointsReadLast * pObj->bytesPerPoint);
dTrace("vid:%d sid:%d id:%s, points:%d in last block will be merged to new block", dTrace("vid:%d sid:%d id:%s, points:%d in last block will be merged to new block",
pObj->vnode, pObj->sid, pObj->meterId, pointsReadLast); pObj->vnode, pObj->sid, pObj->meterId, pointsReadLast);
} }
pMeter->changed = 1; pMeter->changed = 1;
...@@ -717,8 +730,8 @@ _again: ...@@ -717,8 +730,8 @@ _again:
} }
dTrace("vid:%d sid:%d id:%s, %d points are committed, lastKey:%lld slot:%d pos:%d newNumOfBlocks:%d", dTrace("vid:%d sid:%d id:%s, %d points are committed, lastKey:%lld slot:%d pos:%d newNumOfBlocks:%d",
pObj->vnode, pObj->sid, pObj->meterId, pMeter->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos, pObj->vnode, pObj->sid, pObj->meterId, pMeter->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos,
pMeter->newNumOfBlocks); pMeter->newNumOfBlocks);
if (pMeter->committedPoints > 0) { if (pMeter->committedPoints > 0) {
pMeter->commitSlot = query.slot; pMeter->commitSlot = query.slot;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeMgmt.h" #include "vnodeMgmt.h"
#include "vnodeShell.h" #include "vnodeShell.h"
#include "vnodeShell.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wpointer-sign" #pragma GCC diagnostic ignored "-Wpointer-sign"
#pragma GCC diagnostic ignored "-Wint-conversion" #pragma GCC diagnostic ignored "-Wint-conversion"
...@@ -281,14 +282,32 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -281,14 +282,32 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
SShellObj * pShell = pImport->pShell; SShellObj * pShell = pImport->pShell;
pImport->retry++; pImport->retry++;
pObj->state = TSDB_METER_STATE_IMPORTING;
//slow query will block the import operation
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
return;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex); pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess || pObj->numOfQueries > 0) { if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) {
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
pObj->state = TSDB_METER_STATE_READY; vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) { if (pImport->retry < 1000) {
dTrace("vid:%d sid:%d id:%s, commit in process, try to import later", pObj->vnode, pObj->sid, pObj->meterId); dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return; return;
} else { } else {
...@@ -304,7 +323,8 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -304,7 +323,8 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
} }
} }
pObj->state = TSDB_METER_STATE_READY; vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++; pVnode->version++;
// send response back to shell // send response back to shell
...@@ -850,10 +870,13 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -850,10 +870,13 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
payload = pSubmit->payLoad; payload = pSubmit->payLoad;
if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + *((TSKEY *)(payload))) { int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to import, key:%lld", int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, *(TSKEY *)(payload)); int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
return TSDB_CODE_OTHERS; if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) {
dError("vid:%d sid:%d id:%s, invalid timestamp to import, firstKey: %ld lastKey: %ld",
pObj->vnode, pObj->sid, pObj->meterId, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)));
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
} }
if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
...@@ -862,15 +885,19 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -862,15 +885,19 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported);
if (pShell) { if (pShell) {
pShell->code = code; pShell->code = code;
pShell->numOfTotalPoints += pointsImported; pShell->numOfTotalPoints += pointsImported;
} }
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else { } else {
SImportInfo *pNew, import; SImportInfo *pNew, import;
pObj->state = TSDB_METER_STATE_IMPORTING;
dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows); dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows);
memset(&import, 0, sizeof(import)); memset(&import, 0, sizeof(import));
import.firstKey = *((TSKEY *)(payload)); import.firstKey = *((TSKEY *)(payload));
...@@ -880,10 +907,16 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -880,10 +907,16 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
import.payload = payload; import.payload = payload;
import.rows = rows; import.rows = rows;
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex); pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess || pObj->numOfQueries > 0) { if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
pObj->state = TSDB_METER_STATE_READY;
pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo)); memcpy(pNew, &import, sizeof(SImportInfo));
...@@ -892,8 +925,9 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -892,8 +925,9 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pNew->payload = malloc(payloadLen); pNew->payload = malloc(payloadLen);
memcpy(pNew->payload, payload, payloadLen); memcpy(pNew->payload, payload, payloadLen);
dTrace("vid:%d sid:%d id:%s, commit/query:%d in process, import later, ", pObj->vnode, pObj->sid, pObj->meterId, dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->numOfQueries); pObj->meterId, commitInProcess, pObj->numOfQueries);
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0; return 0;
} else { } else {
...@@ -907,7 +941,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -907,7 +941,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
} }
pObj->state = TSDB_METER_STATE_READY;
pVnode->version++; pVnode->version++;
if (pShell) { if (pShell) {
...@@ -918,6 +951,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -918,6 +951,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
return 0; return 0;
} }
//todo abort from the procedure if the meter is going to be dropped
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
int code = 0; int code = 0;
......
...@@ -47,6 +47,8 @@ void vnodeFreeMeterObj(SMeterObj *pObj) { ...@@ -47,6 +47,8 @@ void vnodeFreeMeterObj(SMeterObj *pObj) {
if (vnodeList[pObj->vnode].meterList != NULL) { if (vnodeList[pObj->vnode].meterList != NULL) {
vnodeList[pObj->vnode].meterList[pObj->sid] = NULL; vnodeList[pObj->vnode].meterList[pObj->sid] = NULL;
} }
memset(pObj->meterId, 0, tListLen(pObj->meterId));
tfree(pObj); tfree(pObj);
} }
...@@ -143,7 +145,7 @@ int vnodeSaveMeterObjToFile(SMeterObj *pObj) { ...@@ -143,7 +145,7 @@ int vnodeSaveMeterObjToFile(SMeterObj *pObj) {
memcpy(buffer, pObj, offsetof(SMeterObj, reserved)); memcpy(buffer, pObj, offsetof(SMeterObj, reserved));
memcpy(buffer + offsetof(SMeterObj, reserved), pObj->schema, pObj->numOfColumns * sizeof(SColumn)); memcpy(buffer + offsetof(SMeterObj, reserved), pObj->schema, pObj->numOfColumns * sizeof(SColumn));
memcpy(buffer + offsetof(SMeterObj, reserved) + pObj->numOfColumns * sizeof(SColumn), pObj->pSql, pObj->sqlLen); memcpy(buffer + offsetof(SMeterObj, reserved) + pObj->numOfColumns * sizeof(SColumn), pObj->pSql, pObj->sqlLen);
taosCalcChecksumAppend(0, buffer, new_length); taosCalcChecksumAppend(0, (uint8_t *)buffer, new_length);
if (offset == 0 || length < new_length) { // New, append to file end if (offset == 0 || length < new_length) { // New, append to file end
fseek(fp, 0, SEEK_END); fseek(fp, 0, SEEK_END);
...@@ -208,7 +210,7 @@ int vnodeSaveAllMeterObjToFile(int vnode) { ...@@ -208,7 +210,7 @@ int vnodeSaveAllMeterObjToFile(int vnode) {
memcpy(buffer, pObj, offsetof(SMeterObj, reserved)); memcpy(buffer, pObj, offsetof(SMeterObj, reserved));
memcpy(buffer + offsetof(SMeterObj, reserved), pObj->schema, pObj->numOfColumns * sizeof(SColumn)); memcpy(buffer + offsetof(SMeterObj, reserved), pObj->schema, pObj->numOfColumns * sizeof(SColumn));
memcpy(buffer + offsetof(SMeterObj, reserved) + pObj->numOfColumns * sizeof(SColumn), pObj->pSql, pObj->sqlLen); memcpy(buffer + offsetof(SMeterObj, reserved) + pObj->numOfColumns * sizeof(SColumn), pObj->pSql, pObj->sqlLen);
taosCalcChecksumAppend(0, buffer, new_length); taosCalcChecksumAppend(0, (uint8_t *)buffer, new_length);
if (offset == 0 || length > new_length) { // New, append to file end if (offset == 0 || length > new_length) { // New, append to file end
new_offset = fseek(fp, 0, SEEK_END); new_offset = fseek(fp, 0, SEEK_END);
...@@ -391,7 +393,7 @@ int vnodeOpenMetersVnode(int vnode) { ...@@ -391,7 +393,7 @@ int vnodeOpenMetersVnode(int vnode) {
fseek(fp, offset, SEEK_SET); fseek(fp, offset, SEEK_SET);
if (fread(buffer, length, 1, fp) <= 0) break; if (fread(buffer, length, 1, fp) <= 0) break;
if (taosCheckChecksumWhole(buffer, length)) { if (taosCheckChecksumWhole((uint8_t *)buffer, length)) {
vnodeRestoreMeterObj(buffer, length - sizeof(TSCKSUM)); vnodeRestoreMeterObj(buffer, length - sizeof(TSCKSUM));
} else { } else {
dError("meter object file is broken since checksum mismatch, vnode: %d sid: %d, try to recover", vnode, sid); dError("meter object file is broken since checksum mismatch, vnode: %d sid: %d, try to recover", vnode, sid);
...@@ -440,7 +442,7 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec) { ...@@ -440,7 +442,7 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec) {
} }
dTrace("vid:%d sid:%d id:%s, update schema", pNew->vnode, pNew->sid, pNew->meterId); dTrace("vid:%d sid:%d id:%s, update schema", pNew->vnode, pNew->sid, pNew->meterId);
if (pObj->state != TSDB_METER_STATE_UPDATING) vnodeUpdateMeter(pNew, NULL); if (!vnodeIsMeterState(pObj, TSDB_METER_STATE_UPDATING)) vnodeUpdateMeter(pNew, NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -483,27 +485,20 @@ int vnodeRemoveMeterObj(int vnode, int sid) { ...@@ -483,27 +485,20 @@ int vnodeRemoveMeterObj(int vnode, int sid) {
if (vnodeList[vnode].meterList == NULL) return 0; if (vnodeList[vnode].meterList == NULL) return 0;
pObj = vnodeList[vnode].meterList[sid]; pObj = vnodeList[vnode].meterList[sid];
if ((pObj == NULL) || (pObj->state == TSDB_METER_STATE_DELETED)) return 0; if (pObj == NULL) {
if (pObj->state == TSDB_METER_STATE_IMPORTING) return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_SUCCESS;
}
int32_t retFlag = 0; if (!vnodeIsSafeToDeleteMeter(&vnodeList[vnode], sid)) {
pthread_mutex_lock(&vnodeList[vnode].vmutex); return TSDB_CODE_ACTION_IN_PROGRESS;
pObj->state = TSDB_METER_STATE_DELETING;
if (pObj->numOfQueries > 0) {
retFlag = TSDB_CODE_ACTION_IN_PROGRESS;
dWarn("vid:%d sid:%d id:%s %d queries executing on it, wait query to be finished",
vnode, pObj->sid, pObj->meterId, pObj->numOfQueries);
} }
pthread_mutex_unlock(&vnodeList[vnode].vmutex);
if (retFlag != 0) return retFlag;
// after remove this meter, change its stat to DELETED // after remove this meter, change its state to DELETED
pObj->state = TSDB_METER_STATE_DELETED; pObj->state = TSDB_METER_STATE_DELETED;
pObj->timeStamp = taosGetTimestampMs(); pObj->timeStamp = taosGetTimestampMs();
vnodeList[vnode].lastRemove = pObj->timeStamp; vnodeList[vnode].lastRemove = pObj->timeStamp;
vnodeRemoveStream(pObj); vnodeRemoveStream(pObj);
pObj->meterId[0] = 0;
vnodeSaveMeterObjToFile(pObj); vnodeSaveMeterObjToFile(pObj);
vnodeFreeMeterObj(pObj); vnodeFreeMeterObj(pObj);
...@@ -517,6 +512,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -517,6 +512,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
char * pData; char * pData;
TSKEY tsKey; TSKEY tsKey;
int cfile;
int points = 0; int points = 0;
int code = TSDB_CODE_SUCCESS; int code = TSDB_CODE_SUCCESS;
SVnodeObj * pVnode = vnodeList + pObj->vnode; SVnodeObj * pVnode = vnodeList + pObj->vnode;
...@@ -533,6 +529,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -533,6 +529,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
// to guarantee time stamp is the same for all vnodes // to guarantee time stamp is the same for all vnodes
pData = pSubmit->payLoad; pData = pSubmit->payLoad;
tsKey = taosGetTimestamp(pVnode->cfg.precision); tsKey = taosGetTimestamp(pVnode->cfg.precision);
cfile = tsKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
if (*((TSKEY *)pData) == 0) { if (*((TSKEY *)pData) == 0) {
for (i = 0; i < numOfPoints; ++i) { for (i = 0; i < numOfPoints; ++i) {
*((TSKEY *)pData) = tsKey++; *((TSKEY *)pData) = tsKey++;
...@@ -575,13 +572,24 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -575,13 +572,24 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
code = 0; code = 0;
TSKEY firstKey = *((TSKEY *)pData); TSKEY firstKey = *((TSKEY *)pData);
if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + firstKey) { int firstId = firstKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to insert, key:%lld", pObj->vnode, pObj->sid, int lastId = (*(TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
pObj->meterId, pVnode->lastKeyOnFile, firstKey); if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) {
return TSDB_CODE_OTHERS; dError("vid:%d sid:%d id:%s, invalid timestamp to insert, firstKey: %ld lastKey: %ld ", pObj->vnode, pObj->sid,
pObj->meterId, firstKey, (*(TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1))));
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
} }
for (i = 0; i < numOfPoints; ++i) { for (i = 0; i < numOfPoints; ++i) {
// meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
break;
}
if (*((TSKEY *)pData) <= pObj->lastKey) { if (*((TSKEY *)pData) <= pObj->lastKey) {
dWarn("vid:%d sid:%d id:%s, received key:%ld not larger than lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId, dWarn("vid:%d sid:%d id:%s, received key:%ld not larger than lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId,
*((TSKEY *)pData), pObj->lastKey); *((TSKEY *)pData), pObj->lastKey);
...@@ -632,9 +640,11 @@ void vnodeProcessUpdateSchemaTimer(void *param, void *tmrId) { ...@@ -632,9 +640,11 @@ void vnodeProcessUpdateSchemaTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pPool->vmutex); pthread_mutex_lock(&pPool->vmutex);
if (pPool->commitInProcess) { if (pPool->commitInProcess) {
dTrace("vid:%d sid:%d mid:%s, commiting in process, commit later", pObj->vnode, pObj->sid, pObj->meterId); dTrace("vid:%d sid:%d mid:%s, committing in process, commit later", pObj->vnode, pObj->sid, pObj->meterId);
if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 10, pObj, vnodeTmrCtrl) == NULL) if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 10, pObj, vnodeTmrCtrl) == NULL) {
pObj->state = TSDB_METER_STATE_READY; vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
}
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
return; return;
} }
...@@ -649,41 +659,54 @@ void vnodeUpdateMeter(void *param, void *tmrId) { ...@@ -649,41 +659,54 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
SMeterObj *pNew = (SMeterObj *)param; SMeterObj *pNew = (SMeterObj *)param;
if (pNew == NULL || pNew->vnode < 0 || pNew->sid < 0) return; if (pNew == NULL || pNew->vnode < 0 || pNew->sid < 0) return;
if (vnodeList[pNew->vnode].meterList == NULL) { SVnodeObj* pVnode = &vnodeList[pNew->vnode];
if (pVnode->meterList == NULL) {
dTrace("vid:%d sid:%d id:%s, vnode is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId); dTrace("vid:%d sid:%d id:%s, vnode is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew->schema); free(pNew->schema);
free(pNew); free(pNew);
return; return;
} }
SMeterObj *pObj = vnodeList[pNew->vnode].meterList[pNew->sid]; SMeterObj *pObj = pVnode->meterList[pNew->sid];
if (pObj == NULL) { if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, meter is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId); dTrace("vid:%d sid:%d id:%s, meter is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew->schema); free(pNew->schema);
free(pNew); free(pNew);
return; return;
} }
pObj->state = TSDB_METER_STATE_UPDATING; int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_UPDATING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
return;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
if (num > 0 || state != TSDB_METER_STATE_READY) {
dTrace("vid:%d sid:%d id:%s, update failed, retry later, numOfQueries:%d, state:%d",
pNew->vnode, pNew->sid, pNew->meterId, num, state);
if (pObj->numOfQueries > 0) { // retry update meter in 50ms
if (taosTmrStart(vnodeUpdateMeter, 50, pNew, vnodeTmrCtrl) == NULL) { if (taosTmrStart(vnodeUpdateMeter, 50, pNew, vnodeTmrCtrl) == NULL) {
dError("vid:%d sid:%d id:%s, failed to start update timer", pNew->vnode, pNew->sid, pNew->meterId); dError("vid:%d sid:%d id:%s, failed to start update timer, no retry", pNew->vnode, pNew->sid, pNew->meterId);
pObj->state = TSDB_METER_STATE_READY;
free(pNew->schema); free(pNew->schema);
free(pNew); free(pNew);
} }
dTrace("vid:%d sid:%d id:%s, there are ongoing queries, update later", pNew->vnode, pNew->sid, pNew->meterId);
return; return;
} }
// commit first // commit first
if (!vnodeIsCacheCommitted(pObj)) { if (!vnodeIsCacheCommitted(pObj)) {
// commit // commit data first
if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 0, pObj, vnodeTmrCtrl) == NULL) { if (taosTmrStart(vnodeProcessUpdateSchemaTimer, 0, pObj, vnodeTmrCtrl) == NULL) {
dError("vid:%d sid:%d id:%s, failed to start commit timer", pObj->vnode, pObj->sid, pObj->meterId); dError("vid:%d sid:%d id:%s, failed to start commit timer", pObj->vnode, pObj->sid, pObj->meterId);
pObj->state = TSDB_METER_STATE_READY; vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
free(pNew->schema); free(pNew->schema);
free(pNew); free(pNew);
return; return;
...@@ -691,13 +714,14 @@ void vnodeUpdateMeter(void *param, void *tmrId) { ...@@ -691,13 +714,14 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
if (taosTmrStart(vnodeUpdateMeter, 50, pNew, vnodeTmrCtrl) == NULL) { if (taosTmrStart(vnodeUpdateMeter, 50, pNew, vnodeTmrCtrl) == NULL) {
dError("vid:%d sid:%d id:%s, failed to start update timer", pNew->vnode, pNew->sid, pNew->meterId); dError("vid:%d sid:%d id:%s, failed to start update timer", pNew->vnode, pNew->sid, pNew->meterId);
pObj->state = TSDB_METER_STATE_READY; vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
free(pNew->schema); free(pNew->schema);
free(pNew); free(pNew);
} }
dTrace("vid:%d sid:%d meterId:%s, there are data in cache, commit first, update later", dTrace("vid:%d sid:%d meterId:%s, there are data in cache, commit first, update later",
pNew->vnode, pNew->sid, pNew->meterId); pNew->vnode, pNew->sid, pNew->meterId);
vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
return; return;
} }
...@@ -716,7 +740,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) { ...@@ -716,7 +740,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
pObj->sversion = pNew->sversion; pObj->sversion = pNew->sversion;
vnodeSaveMeterObjToFile(pObj); vnodeSaveMeterObjToFile(pObj);
pObj->state = TSDB_METER_STATE_READY; vnodeClearMeterState(pObj, TSDB_METER_STATE_UPDATING);
dTrace("vid:%d sid:%d id:%s, schema is updated", pNew->vnode, pNew->sid, pNew->meterId); dTrace("vid:%d sid:%d id:%s, schema is updated", pNew->vnode, pNew->sid, pNew->meterId);
free(pNew); free(pNew);
......
...@@ -1730,6 +1730,17 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg) ...@@ -1730,6 +1730,17 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg)
bool isQueryKilled(SQuery *pQuery) { bool isQueryKilled(SQuery *pQuery) {
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
/*
* check if the queried meter is going to be deleted.
* if it will be deleted soon, stop current query ASAP.
*/
SMeterObj* pMeterObj = pQInfo->pObj;
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
pQInfo->killed = 1;
return true;
}
return (pQInfo->killed == 1); return (pQInfo->killed == 1);
} }
......
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vnodeShell.h"
#include <arpa/inet.h> #include <arpa/inet.h>
#include <assert.h> #include <assert.h>
#include <endian.h> #include <endian.h>
#include <stdint.h> #include <stdint.h>
#include "taosmsg.h" #include "taosmsg.h"
#include "vnode.h"
#include "vnodeShell.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "textbuffer.h" #include "textbuffer.h"
...@@ -28,6 +29,7 @@ ...@@ -28,6 +29,7 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeRead.h" #include "vnodeRead.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wint-conversion" #pragma GCC diagnostic ignored "-Wint-conversion"
void * pShellServer = NULL; void * pShellServer = NULL;
...@@ -87,6 +89,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -87,6 +89,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle); dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
// set in query processing flag
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { } else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
...@@ -96,7 +99,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -96,7 +99,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); dError("%s is not processed", taosMsg[pMsg->msgType]);
} }
return pObj; return pObj;
} }
...@@ -157,16 +160,31 @@ int vnodeOpenShellVnode(int vnode) { ...@@ -157,16 +160,31 @@ int vnodeOpenShellVnode(int vnode) {
return 0; return 0;
} }
void vnodeCloseShellVnode(int vnode) { static void vnodeDelayedFreeResource(void *param, void *tmrId) {
taosCloseRpcChann(pShellServer, vnode); int32_t vnode = *(int32_t*) param;
taosCloseRpcChann(pShellServer, vnode); // close connection
tfree (shellList[vnode]); //free SShellObj
tfree(param);
}
void vnodeCloseShellVnode(int vnode) {
if (shellList[vnode] == NULL) return; if (shellList[vnode] == NULL) return;
for (int i = 0; i < vnodeList[vnode].cfg.maxSessions; ++i) { for (int i = 0; i < vnodeList[vnode].cfg.maxSessions; ++i) {
vnodeFreeQInfo(shellList[vnode][i].qhandle, true); vnodeFreeQInfo(shellList[vnode][i].qhandle, true);
} }
tfree(shellList[vnode]); int32_t* v = malloc(sizeof(int32_t));
*v = vnode;
/*
* free the connection related resource after 5sec.
* 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash
* 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access.
*/
dTrace("vid:%d, delay 5sec to free resources", vnode);
taosTmrStart(vnodeDelayedFreeResource, 5000, v, vnodeTmrCtrl);
} }
void vnodeCleanUpShell() { void vnodeCleanUpShell() {
...@@ -230,7 +248,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -230,7 +248,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
} }
if (pQueryMsg->numOfSids <= 0) { if (pQueryMsg->numOfSids <= 0) {
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over; goto _query_over;
} }
...@@ -245,7 +263,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -245,7 +263,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0) { if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode); dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode); vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _query_over; goto _query_over;
} }
...@@ -256,13 +274,13 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -256,13 +274,13 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pQueryMsg->pSidExtInfo == 0) { if (pQueryMsg->pSidExtInfo == 0) {
dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg); dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over; goto _query_over;
} }
if (pVnode->meterList == NULL) { if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode); dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _query_over; goto _query_over;
} }
...@@ -430,7 +448,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -430,7 +448,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pSubmit->numOfSid <= 0) { if (pSubmit->numOfSid <= 0) {
dError("invalid num of meters:%d", pSubmit->numOfSid); dError("invalid num of meters:%d", pSubmit->numOfSid);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _submit_over; goto _submit_over;
} }
...@@ -444,7 +462,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -444,7 +462,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) { if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
dError("vid:%d is not activated for submit", pSubmit->vnode); dError("vid:%d is not activated for submit", pSubmit->vnode);
vnodeSendVpeerCfgMsg(pSubmit->vnode); vnodeSendVpeerCfgMsg(pSubmit->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _submit_over; goto _submit_over;
} }
...@@ -488,24 +506,39 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -488,24 +506,39 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int sversion = htonl(pBlocks->sversion); int sversion = htonl(pBlocks->sversion);
if (pMeterObj->state == TSDB_METER_STATE_READY) { int32_t state = TSDB_METER_STATE_READY;
if (pSubmit->import) if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else {
state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
if (state == TSDB_METER_STATE_READY) {
// meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints); sversion, &numOfPoints);
else vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, } else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints); sversion, &numOfPoints);
if (code != 0) break; vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
} else if (pMeterObj->state >= TSDB_METER_STATE_DELETING) { }
dTrace("vid:%d sid:%d id:%s, is is removed, state:", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state); if (code != TSDB_CODE_SUCCESS) {break;}
code = TSDB_CODE_NOT_ACTIVE_SESSION; } else {
break; if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
} else { // importing state or others dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
dTrace("vid:%d sid:%d id:%s, try again since in state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterObj->state);
pMeterObj->state); code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_ACTION_IN_PROGRESS; break;
break; } else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
} }
numOfTotalPoints += numOfPoints; numOfTotalPoints += numOfPoints;
......
...@@ -85,13 +85,42 @@ int vnodeOpenVnode(int vnode) { ...@@ -85,13 +85,42 @@ int vnodeOpenVnode(int vnode) {
return 0; return 0;
} }
void vnodeCloseVnode(int vnode) { static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
if (vnodeList == NULL) return; if (pVnode->meterList == NULL) {
assert(pVnode->cfg.maxSessions == 0);
return TSDB_CODE_SUCCESS;
}
bool ready = true;
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
if (!vnodeIsSafeToDeleteMeter(pVnode, sid)) {
ready = false;
} else { // set the meter is to be deleted
SMeterObj* pObj = pVnode->meterList[sid];
if (pObj != NULL) {
pObj->state = TSDB_METER_STATE_DELETED;
}
}
}
return ready? TSDB_CODE_SUCCESS:TSDB_CODE_ACTION_IN_PROGRESS;
}
int vnodeCloseVnode(int vnode) {
if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
SVnodeObj* pVnode = &vnodeList[vnode];
pthread_mutex_lock(&dmutex); pthread_mutex_lock(&dmutex);
if (vnodeList[vnode].cfg.maxSessions == 0) { if (pVnode->cfg.maxSessions == 0) {
pthread_mutex_unlock(&dmutex); pthread_mutex_unlock(&dmutex);
return; return TSDB_CODE_SUCCESS;
}
// set the meter is dropped flag
if (vnodeMarkAllMetersDropped(pVnode) != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&dmutex);
return TSDB_CODE_ACTION_IN_PROGRESS;
} }
vnodeCloseStream(vnodeList + vnode); vnodeCloseStream(vnodeList + vnode);
...@@ -111,6 +140,7 @@ void vnodeCloseVnode(int vnode) { ...@@ -111,6 +140,7 @@ void vnodeCloseVnode(int vnode) {
vnodeCalcOpenVnodes(); vnodeCalcOpenVnodes();
pthread_mutex_unlock(&dmutex); pthread_mutex_unlock(&dmutex);
return TSDB_CODE_SUCCESS;
} }
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) { int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
...@@ -182,25 +212,23 @@ void vnodeRemoveDataFiles(int vnode) { ...@@ -182,25 +212,23 @@ void vnodeRemoveDataFiles(int vnode) {
dTrace("vnode %d is removed!", vnode); dTrace("vnode %d is removed!", vnode);
} }
void vnodeRemoveVnode(int vnode) { int vnodeRemoveVnode(int vnode) {
if (vnodeList == NULL) return; if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
if (vnodeList[vnode].cfg.maxSessions > 0) { if (vnodeList[vnode].cfg.maxSessions > 0) {
vnodeCloseVnode(vnode); int32_t ret = vnodeCloseVnode(vnode);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
vnodeRemoveDataFiles(vnode); vnodeRemoveDataFiles(vnode);
// sprintf(cmd, "rm -rf %s/vnode%d", tsDirectory, vnode);
// if ( system(cmd) < 0 ) {
// dError("vid:%d, failed to run command %s vnode, reason:%s", vnode, cmd, strerror(errno));
// } else {
// dTrace("vid:%d, this vnode is deleted!!!", vnode);
// }
} else { } else {
dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions); dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions);
vnodeList[vnode].cfg.maxSessions = 0; vnodeList[vnode].cfg.maxSessions = 0; //reset value
vnodeCalcOpenVnodes(); vnodeCalcOpenVnodes();
} }
return TSDB_CODE_SUCCESS;
} }
int vnodeInitStore() { int vnodeInitStore() {
......
...@@ -13,8 +13,9 @@ ...@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "taosmsg.h"
#include "vnode.h" #include "vnode.h"
#include <taosmsg.h> #include "vnodeUtil.h"
/* static TAOS *dbConn = NULL; */ /* static TAOS *dbConn = NULL; */
void vnodeCloseStreamCallback(void *param); void vnodeCloseStreamCallback(void *param);
...@@ -51,8 +52,17 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -51,8 +52,17 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
} }
contLen += sizeof(SSubmitMsg); contLen += sizeof(SSubmitMsg);
int32_t numOfPoints = 0; int32_t numOfPoints = 0;
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints);
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT);
if (state == TSDB_METER_STATE_READY) {
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints);
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid,
pObj->meterId, state);
}
assert(numOfPoints >= 0 && numOfPoints <= 1); assert(numOfPoints >= 0 && numOfPoints <= 1);
tfree(pTemp); tfree(pTemp);
...@@ -76,7 +86,7 @@ void vnodeOpenStreams(void *param, void *tmrId) { ...@@ -76,7 +86,7 @@ void vnodeOpenStreams(void *param, void *tmrId) {
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
pObj = pVnode->meterList[sid]; pObj = pVnode->meterList[sid];
if (pObj == NULL || pObj->sqlLen == 0 || pObj->status == 1 || pObj->state == TSDB_METER_STATE_DELETED) continue; if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) continue;
dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql);
......
...@@ -361,6 +361,7 @@ void vnodeUpdateFilterColumnIndex(SQuery* pQuery) { ...@@ -361,6 +361,7 @@ void vnodeUpdateFilterColumnIndex(SQuery* pQuery) {
// TODO support k<12 and k<>9 // TODO support k<12 and k<>9
int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) { int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].data.filterOn > 0) { if (pQuery->colList[i].data.filterOn > 0) {
pQuery->numOfFilterCols++; pQuery->numOfFilterCols++;
...@@ -401,8 +402,6 @@ int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) { ...@@ -401,8 +402,6 @@ int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) {
pFilterInfo->fp = rangeFilterArray[2]; pFilterInfo->fp = rangeFilterArray[2];
} }
} else { } else {
assert(lower == TSDB_RELATION_LARGE);
if (upper == TSDB_RELATION_LESS_EQUAL) { if (upper == TSDB_RELATION_LESS_EQUAL) {
pFilterInfo->fp = rangeFilterArray[3]; pFilterInfo->fp = rangeFilterArray[3];
} else { } else {
...@@ -421,6 +420,7 @@ int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) { ...@@ -421,6 +420,7 @@ int32_t vnodeCreateFilterInfo(void* pQInfo, SQuery* pQuery) {
pFilterInfo->fp = filterArray[upper]; pFilterInfo->fp = filterArray[upper];
} }
} }
pFilterInfo->elemSize = bytes; pFilterInfo->elemSize = bytes;
j++; j++;
} }
...@@ -470,6 +470,18 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) { ...@@ -470,6 +470,18 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) {
return true; return true;
} }
/*
* the pMeter->state may be changed by vnodeIsSafeToDeleteMeter and import/update processor, the check of
* the state will not always be correct.
*
* The import/update/deleting is actually blocked by current query processing if the check of meter state is
* passed, but later queries are denied.
*
* 1. vnodeIsSafeToDelete will wait for this complete, since it also use the vmutex to check the numOfQueries
* 2. import will check the numOfQueries again after setting state to be TSDB_METER_STATE_IMPORTING, while the
* vmutex is also used.
* 3. insert has nothing to do with the query processing.
*/
int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSids, SMeterObj** pMeterObjList, int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSids, SMeterObj** pMeterObjList,
int32_t* numOfInc) { int32_t* numOfInc) {
SVnodeObj* pVnode = &vnodeList[pQueryMsg->vnode]; SVnodeObj* pVnode = &vnodeList[pQueryMsg->vnode];
...@@ -477,21 +489,24 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid ...@@ -477,21 +489,24 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
int32_t num = 0; int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// check all meter metadata to ensure all metadata are identical.
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
SMeterObj* pMeter = pVnode->meterList[pSids[i]->sid]; SMeterObj* pMeter = pVnode->meterList[pSids[i]->sid];
if (pMeter == NULL || pMeter->state != TSDB_METER_STATE_READY) { if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) {
if (pMeter == NULL) { if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) {
code = TSDB_CODE_NOT_ACTIVE_SESSION; code = TSDB_CODE_NOT_ACTIVE_SESSION;
dError("qmsg:%p, vid:%d sid:%d, not there", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid); dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid); vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
} else { } else {//update or import
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%d, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid, dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%d, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid,
pMeter->meterId, pMeter->state); pMeter->meterId, pMeter->state);
} }
} else { } else {
/*
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
* check if the numOfQueries is 0 or not.
*/
pMeterObjList[(*numOfInc)++] = pMeter; pMeterObjList[(*numOfInc)++] = pMeter;
__sync_fetch_and_add(&pMeter->numOfQueries, 1); __sync_fetch_and_add(&pMeter->numOfQueries, 1);
...@@ -517,7 +532,6 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, ...@@ -517,7 +532,6 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
SMeterObj* pMeter = pMeterObjList[i]; SMeterObj* pMeter = pMeterObjList[i];
if (pMeter != NULL) { // here, do not need to lock to perform operations if (pMeter != NULL) { // here, do not need to lock to perform operations
assert(pMeter->state != TSDB_METER_STATE_DELETING && pMeter->state != TSDB_METER_STATE_DELETED);
__sync_fetch_and_sub(&pMeter->numOfQueries, 1); __sync_fetch_and_sub(&pMeter->numOfQueries, 1);
if (pMeter->numOfQueries > 0) { if (pMeter->numOfQueries > 0) {
...@@ -571,3 +585,66 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) { ...@@ -571,3 +585,66 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {
} }
} }
} }
int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state) {
return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state);
}
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) {
pMeterObj->state &= (~state);
}
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state) {
if (state == TSDB_METER_STATE_READY) {
return pMeterObj->state == TSDB_METER_STATE_READY;
} else if (state == TSDB_METER_STATE_DELETING) {
return pMeterObj->state >= state;
} else {
return (((pMeterObj->state) & state) == state);
}
}
void vnodeSetMeterDeleting(SMeterObj* pMeterObj) {
if (pMeterObj == NULL) {
return;
}
pMeterObj->state |= TSDB_METER_STATE_DELETING;
}
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
SMeterObj* pObj = pVnode->meterList[sid];
if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETED)) {
return true;
}
int32_t prev = vnodeTransferMeterState(pObj, TSDB_METER_STATE_DELETING);
/*
* if the meter is not in ready/deleting state, it must be in insert/import/update,
* set the deleting state and wait the procedure to be completed
*/
if (prev != TSDB_METER_STATE_READY && prev < TSDB_METER_STATE_DELETING) {
vnodeSetMeterDeleting(pObj);
dWarn("vid:%d sid:%d id:%s, can not be deleted, state:%d, wait", pObj->vnode, pObj->sid, pObj->meterId, prev);
return false;
}
bool ready = true;
/*
* the query will be stopped ASAP, since the state of meter is set to TSDB_METER_STATE_DELETING,
* and new query will abort since the meter is deleted.
*/
pthread_mutex_lock(&pVnode->vmutex);
if (pObj->numOfQueries > 0) {
dWarn("vid:%d sid:%d id:%s %d queries executing on it, wait query to be finished",
pObj->vnode, pObj->sid, pObj->meterId, pObj->numOfQueries);
ready = false;
}
pthread_mutex_unlock(&pVnode->vmutex);
return ready;
}
...@@ -1532,7 +1532,7 @@ void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32 ...@@ -1532,7 +1532,7 @@ void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32
} }
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
void tColModelCompress(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) { void tColModelCompact(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) {
if (inputBuffer->numOfElems == 0 || maxElemsCapacity == inputBuffer->numOfElems) { if (inputBuffer->numOfElems == 0 || maxElemsCapacity == inputBuffer->numOfElems) {
return; return;
} }
......
...@@ -117,7 +117,7 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p ...@@ -117,7 +117,7 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p
} }
} }
bool taosHasNoneInterpoPoints(SInterpolationInfo* pInterpoInfo) { return taosNumOfRemainPoints(pInterpoInfo) > 0; } bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { return taosNumOfRemainPoints(pInterpoInfo) > 0; }
int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) { int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) {
if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) { if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) {
......
char version[64] = "1.6.1.2"; char version[64] = "1.6.1.4";
char compatible_version[64] = "1.6.0.0"; char compatible_version[64] = "1.6.1.0";
char gitinfo[128] = "ddcb2519e895c2e2101089aedaf529cee5cefe04"; char gitinfo[128] = "36936e19eb26b5e45107bca95394133e3ac7c3a1";
char buildinfo[512] = "Built by plum at 2019-07-29 10:41"; char buildinfo[512] = "Built by slguan at 2019-08-05 09:24";
...@@ -63,7 +63,7 @@ ...@@ -63,7 +63,7 @@
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>1.0.0</version> <version>1.0.1</version>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册