提交 f2016993 编写于 作者: H hjxilinx

add the union support in sql parser: fix some bugs. #1032. [TBASE-1140]

上级 c9827141
......@@ -110,7 +110,7 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex
void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
int32_t setMeterID(SSqlObj* pSql, int32_t subClauseIndex, SSQLToken* pzTableName, int32_t tableIndex);
int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr);
......@@ -198,8 +198,8 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql);
int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex);
int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists);
int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
......
......@@ -185,7 +185,7 @@ typedef struct STableDataBlocks {
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache
*/
SMeterMeta* pMeterMeta;
SMeterMeta *pMeterMeta;
union {
char *filename;
......@@ -208,7 +208,7 @@ typedef struct SDataBlockList {
} SDataBlockList;
typedef struct SQueryInfo {
uint16_t type; // query type
uint16_t type; // query/insert/import type
char intervalTimeUnit;
int64_t etime, stime;
......@@ -227,60 +227,42 @@ typedef struct SQueryInfo {
int16_t numOfTables;
SMeterMetaInfo **pMeterInfo;
struct STSBuf * tsBuf;
// todo use dynamic allocated memory for defaultVal
int64_t defaultVal[TSDB_MAX_COLUMNS]; // default value for interpolation
char* msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t * defaultVal; // default value for interpolation
char * msg; // pointer to the pCmd->payload to keep error message temporarily
} SQueryInfo;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef struct {
// SOrderVal order;
int command;
int count; // TODO refactor
uint8_t msgType;
union {
bool existsCheck; // check if the table exists
bool import; // import/insert type
bool existsCheck; // check if the table exists or not
bool inStream; // denote if current sql is executed in stream or not
bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
};
int8_t isInsertFromFile; // load data from file or not
uint8_t msgType;
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
/*
* use to keep short request msg and error msg, in such case, SSqlCmd->payload == SSqlCmd->ext;
* create table/query/insert operations will exceed the TSDB_SQLCMD_SIZE.
*
* In such cases, allocate the memory dynamically, and need to free the memory
*/
short numOfCols;
uint32_t allocSize;
char * payload;
int payloadLen;
short numOfCols;
int64_t globalLimit;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
// char intervalTimeUnit;
// int64_t etime, stime;
// int64_t nAggTimeInterval; // aggregation time interval
// int64_t nSlidingTime; // sliding window in mseconds
// SSqlGroupbyExpr groupbyExpr; // group by tags info
//
// SColumnBaseInfo colList;
// SFieldInfo fieldsInfo;
// SSqlExprInfo exprsInfo;
// SLimitVal limit;
// SLimitVal slimit;
// STagCond tagCond;
// int16_t interpoType; // interpolate type
// int16_t numOfTables;
// SMeterMetaInfo **pMeterInfo;
// struct STSBuf * tsBuf;
// // todo use dynamic allocated memory for defaultVal
// int64_t defaultVal[TSDB_MAX_COLUMNS]; // default value for interpolation
// submit data blocks branched according to vnode
SDataBlockList * pDataBlocks;
SDataBlockList *pDataBlocks;
// for parameter ('?') binding and batch processing
int32_t batchSize;
......@@ -359,8 +341,8 @@ typedef struct _sql_obj {
SSqlCmd cmd;
SSqlRes res;
uint8_t numOfSubs;
char* asyncTblPos;
void* pTableHashList;
char * asyncTblPos;
void * pTableHashList;
struct _sql_obj **pSubs;
struct _sql_obj * prev, *next;
} SSqlObj;
......@@ -423,12 +405,12 @@ int taos_retrieve(TAOS_RES *res);
* transfer function for metric query in stream computing, the function need to be change
* before send query message to vnode
*/
int32_t tscTansformSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo);
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SQueryInfo* pQueryInfo, SSqlRes *pRes);
int32_t tscCreateResPointerInfo(SQueryInfo *pQueryInfo, SSqlRes *pRes);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
......@@ -450,13 +432,13 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SSqlObj* pSql);
bool tscHasReachLimitation(SSqlObj *pSql);
char* tscGetErrorMsgPayload(SSqlCmd* pCmd);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
......
......@@ -413,13 +413,13 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
assert(!pCmd->isInsertFromFile && pSql->signature == pSql);
assert(pCmd->dataSourceType != 0 && pSql->signature == pSql);
int32_t index = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index, 0);
assert(pQueryInfo->numOfTables == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
......@@ -456,7 +456,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj *pSql = (SSqlObj *)param;
if (pSql == NULL || pSql->signature != pSql) return;
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -480,7 +479,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
tscGetMeterMeta(pSql, pMeterMetaInfo);
code = tscSendMsgToServer(pSql);
if (code != 0) {
pRes->code = code;
......@@ -513,7 +512,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p get metricMeta during metric query successfully", pSql);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -529,12 +528,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
} else { // stream computing
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
code = tscGetMetricMeta(pSql);
pRes->code = code;
......@@ -557,7 +555,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
*/
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
tscTansformSQLFunctionForMetricQuery(pQueryInfo);
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else {
tscTrace("%p get meterMeta/metricMeta successfully", pSql);
......
......@@ -18,9 +18,8 @@
#define _XOPEN_SOURCE
#include <hash.h>
#include "os.h"
#include "ihash.h"
#include "os.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
......@@ -72,8 +71,6 @@ static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
}
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
//char * token; //fang not used
//int tokenlen; //fang not used
int32_t index = 0;
SSQLToken sToken;
int64_t interval;
......@@ -118,7 +115,6 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1
pTokenEnd += index;
if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
index = 0;
valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL);
pTokenEnd += index;
......@@ -194,7 +190,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
}
*((int8_t *)payload) = (int8_t) iv;
*((int8_t *)payload) = (int8_t)iv;
}
break;
......@@ -390,9 +386,9 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
}
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
int16_t timePrec, int32_t *code, char* tmpTokenBuf) {
int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
//bool isPrevOptr; //fang, never used
// bool isPrevOptr; //fang, never used
SSQLToken sToken = {0};
char * payload = pDataBlocks->pData + pDataBlocks->size;
......@@ -497,7 +493,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
}
int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMeta, int maxRows,
SParsedDataColInfo *spd, char *error, int32_t *code, char* tmpTokenBuf) {
SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
SSQLToken sToken;
......@@ -520,7 +516,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
*str += index;
if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) {
int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize);
if (0 == tSize) { //TODO pass the correct error code to client
if (0 == tSize) { // TODO pass the correct error code to client
strcpy(error, "client out of memory");
*code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return -1;
......@@ -588,7 +584,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) {
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
//assert(false);
// assert(false);
// do nothing
pDataBlock->nAllocSize = nAllocSizeOld;
return 0;
......@@ -667,7 +663,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
}
int32_t code = TSDB_CODE_INVALID_SQL;
char* tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \"
char * tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \"
if (NULL == tmpTokenBuf) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
......@@ -679,7 +675,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
}
for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
SParamInfo* param = dataBuf->params + i;
SParamInfo *param = dataBuf->params + i;
if (param->idx == -1) {
param->idx = pCmd->numOfParams++;
param->offset -= sizeof(SShellSubmitBlock);
......@@ -700,16 +696,20 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
return TSDB_CODE_SUCCESS;
}
static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
int32_t index = 0;
SSQLToken sToken;
SSQLToken tableToken;
SSQLToken sToken = {0};
SSQLToken tableToken = {0};
int32_t code = TSDB_CODE_SUCCESS;
const int32_t TABLE_INDEX = 0;
const int32_t STABLE_INDEX = 1;
SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
char *sql = *sqlstr;
// get the token of specified table
index = 0;
tableToken = tStrGetToken(sql, &index, false, 0, NULL);
......@@ -747,40 +747,53 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
return TSDB_CODE_INVALID_SQL;
}
if (sToken.type == TK_USING) { // create table if not exists
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, TABLE_INDEX);
if (sToken.type == TK_USING) { // create table if not exists according to the super table
index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index;
STagData *pTag = (STagData *)pCmd->payload;
memset(pTag, 0, sizeof(STagData));
setMeterID(pSql, 0, &sToken, 0);
strncpy(pTag->name, pMeterMetaInfo->name, TSDB_METER_ID_LEN);
code = tscGetMeterMeta(pSql, pTag->name, 0);
/*
* the source super table is moved to the secondary position of the pMeterMetaInfo list
*/
if (pQueryInfo->numOfTables < 2) {
tscAddEmptyMeterMetaInfo(pQueryInfo);
}
SMeterMetaInfo *pSTableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, STABLE_INDEX);
setMeterID(pSTableMeterMetaInfo, &sToken, pSql);
strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_METER_ID_LEN);
code = tscGetMeterMeta(pSql, pSTableMeterMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
if (!UTIL_METER_IS_SUPERTABLE(pSTableMeterMetaInfo)) {
return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
}
SSchema *pTagSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);
SSchema *pTagSchema = tsGetTagSchema(pSTableMeterMetaInfo->pMeterMeta);
index = 0;
sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index;
SParsedDataColInfo spd = {0};
uint8_t numOfTags = pMeterMetaInfo->pMeterMeta->numOfTags;
uint8_t numOfTags = pSTableMeterMetaInfo->pMeterMeta->numOfTags;
spd.numOfCols = numOfTags;
// if specify some tags column
if (sToken.type != TK_LP) {
tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags);
} else {
/* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen) tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
/* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen)
* tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */
int16_t offset[TSDB_MAX_COLUMNS] = {0};
for (int32_t t = 1; t < numOfTags; ++t) {
offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes;
......@@ -841,7 +854,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
uint32_t ignoreTokenTypes = TK_LP;
uint32_t numOfIgnoreToken = 1;
for (int i = 0; i < spd.numOfAssignedCols; ++i) {
char* tagVal = pTag->data + spd.elems[i].offset;
char * tagVal = pTag->data + spd.elems[i].offset;
int16_t colIndex = spd.elems[i].colIndex;
index = 0;
......@@ -859,13 +872,14 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
sToken.n -= 2;
}
code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, pMeterMetaInfo->pMeterMeta->precision);
code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false,
pSTableMeterMetaInfo->pMeterMeta->precision);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY ||
pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) && sToken.n > pTagSchema[colIndex].bytes) {
if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY || pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) &&
sToken.n > pTagSchema[colIndex].bytes) {
return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
}
}
......@@ -894,20 +908,20 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
}
int32_t ret = setMeterID(pSql, 0, &tableToken, 0);
int32_t ret = setMeterID(pMeterMetaInfo, &tableToken, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
createTable = true;
code = tscGetMeterMetaEx(pSql, pMeterMetaInfo->name, true);
code = tscGetMeterMetaEx(pSql, pMeterMetaInfo, true);
} else {
if (cstart != NULL) {
sql = cstart;
} else {
sql = sToken.z;
}
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
}
int32_t len = cend - cstart + 1;
......@@ -932,6 +946,15 @@ int validateTableName(char *tblName, int len) {
return tscValidateName(&token);
}
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
}
pCmd->dataSourceType = type;
return TSDB_CODE_SUCCESS;
}
/**
* usage: insert into table1 values() () table2 values()()
*
......@@ -945,16 +968,18 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
SSqlCmd *pCmd = &pSql->cmd;
int32_t totalNum = 0;
SQueryInfo* pQueryInfo = NULL;
SMeterMetaInfo* pMeterMetaInfo = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
SMeterMetaInfo *pMeterMetaInfo = NULL;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo != NULL);
if (pQueryInfo->numOfTables == 0) {
pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
} else {
pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// assert(pQueryInfo->numOfTables == 1);
}
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
......@@ -978,16 +1003,26 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
while (1) {
int32_t index = 0;
SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.n == 0) { // parse file, do not release the STableDataBlock
if (pCmd->isInsertFromFile == 1) {
// no data in the sql string anymore.
if (sToken.n == 0) {
/*
* if the data is from the data file, no data has been generated yet. So, there no data to
* merge or submit, save the file path and parse the file in other routines.
*/
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
goto _clean;
}
if (totalNum > 0) {
break;
} else { // no data in current sql string, error
/*
* if no data has been generated during parsing the sql string, error msg will return
* Otherwise, create the first submit block and submit to virtual node.
*/
if (totalNum == 0) {
code = TSDB_CODE_INVALID_SQL;
goto _error_clean;
} else {
break;
}
}
......@@ -999,22 +1034,21 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
//TODO refactor
if ((code = setMeterID(pSql, 0, &sToken, 0)) != TSDB_CODE_SUCCESS) {
if ((code = setMeterID(pMeterMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
void *fp = pSql->fp;
if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/*
* For async insert, after get the metermeta from server, the sql string will not be
* parsed using the new metermeta to avoid the overhead cause by get metermeta data information.
* And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position.
*/
if (fp != NULL) {
//goto _clean;
return code;
} else {
/*
* for async insert, the free data block operations, which is tscDestroyBlockArrayList,
* must be executed before launch another threads to get metermeta, since the
* later ops may manipulate SSqlObj through another thread in getMeterMetaCallback function.
*/
goto _error_clean;
}
}
......@@ -1027,8 +1061,9 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL);
str += index;
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z);
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z);
goto _error_clean;
}
......@@ -1038,14 +1073,9 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns);
if (pCmd->isInsertFromFile == -1) {
pCmd->isInsertFromFile = 0;
} else {
if (pCmd->isInsertFromFile == 1) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
}
/*
* app here insert data in different vnodes, so we need to set the following
......@@ -1056,14 +1086,9 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
} else if (sToken.type == TK_FILE) {
if (pCmd->isInsertFromFile == -1) {
pCmd->isInsertFromFile = 1;
} else {
if (pCmd->isInsertFromFile == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
}
index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL);
......@@ -1099,10 +1124,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0, 0)->pMeterMeta;
SSchema * pSchema = tsGetSchema(pMeterMeta);
if (pCmd->isInsertFromFile == -1) {
pCmd->isInsertFromFile = 0;
} else if (pCmd->isInsertFromFile == 1) {
code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
......@@ -1223,24 +1245,29 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t index = 0;
SSqlCmd *pCmd = &pSql->cmd;
char* sql = pSql->sqlstr;
SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL);
SSQLToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT);
pCmd->import = (sToken.type == TK_IMPORT);
sToken = tStrGetToken(sql, &index, false, 0, NULL);
pCmd->count = 0;
pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
if (sToken.type == TK_INSERT) {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
} else {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_IMPORT);
}
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z);
}
pCmd->count = 0;
pCmd->command = TSDB_SQL_INSERT;
pCmd->isInsertFromFile = -1;
pSql->res.numOfRows = 0;
return doParseInsertSql(pSql, sql + index);
return doParseInsertSql(pSql, pSql->sqlstr + index);
}
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
......@@ -1351,7 +1378,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
while ((readLen = getline(&line, &n, fp)) != -1) {
// line[--readLen] = '\0';
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
if (readLen == 0) continue; //fang, <= to ==
if (readLen == 0) continue; // fang, <= to ==
char *lineptr = line;
strtolower(line, line);
......@@ -1362,7 +1389,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
maxRows += tSize;
}
len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, tmpTokenBuf);
len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code,
tmpTokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
return (-code);
......@@ -1425,7 +1453,7 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
int32_t code = TSDB_CODE_SUCCESS;
/* the first block has been sent to server in processSQL function */
assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
assert(pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) {
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
......@@ -1437,7 +1465,8 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
}
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize);
tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex,
pDataBlocks->nSize);
continue;
}
......@@ -1450,17 +1479,19 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
}
// multi-vnodes insertion in sync query model
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) {
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
if (pCmd->command != TSDB_SQL_INSERT) {
return;
}
SMeterMetaInfo * pInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
STableDataBlocks *pDataBlock = NULL;
int32_t affected_rows = 0;
assert(pCmd->isInsertFromFile == 1 && pCmd->pDataBlocks != NULL);
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL);
SDataBlockList *pDataBlockList = pCmd->pDataBlocks;
pCmd->pDataBlocks = NULL;
......@@ -1486,16 +1517,16 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) {
continue;
}
strncpy(pInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);
int32_t ret = tscGetMeterMeta(pSql, pInfo->name, 0);
int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p get meter meta failed, abort", pSql);
continue;
}
char* tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \"
char *tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \"
if (NULL == tmpTokenBuf) {
tscError("%p calloc failed", pSql);
continue;
......
......@@ -65,8 +65,8 @@ static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t na
static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName);
static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem);
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type,
char* fieldName);
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
int8_t type, char* fieldName);
static int32_t changeFunctionID(int32_t optr, int16_t* functionId);
static int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isMetric);
......@@ -104,7 +104,7 @@ static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuerySql);
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql);
static int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
......@@ -127,7 +127,7 @@ static int32_t invalidSqlErrMsg(char* dstBuffer, const char* errMsg) {
return tscInvalidSQLErrMsg(dstBuffer, errMsg, NULL);
}
static int32_t tscQueryOnlyMetricTags(SQueryInfo *pQueryInfo, bool* queryOnMetricTags) {
static int32_t tscQueryOnlyMetricTags(SQueryInfo* pQueryInfo, bool* queryOnMetricTags) {
assert(QUERY_IS_STABLE_QUERY(pQueryInfo->type));
*queryOnMetricTags = true;
......@@ -207,6 +207,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
assert(pQueryInfo->numOfTables == 0);
SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
pCmd->command = pInfo->type;
......@@ -237,7 +238,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} else if (pInfo->type == TSDB_SQL_DROP_TABLE) {
assert(pInfo->pDCLInfo->nTokens == 1);
if (setMeterID(pSql, 0, pzName, 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterMetaInfo, pzName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
......@@ -370,11 +371,11 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (setMeterID(pSql, 0, pToken, 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
return tscGetMeterMeta(pSql, pMeterMetaInfo);
}
case TSDB_SQL_CFG_DNODE: {
......@@ -628,7 +629,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName);
}
int32_t setSlidingClause(SQueryInfo *pQueryInfo, SQuerySQL* pQuerySql) {
int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg0 = "sliding value too small";
const char* msg1 = "sliding value no larger than the interval value";
......@@ -653,11 +654,10 @@ int32_t setSlidingClause(SQueryInfo *pQueryInfo, SQuerySQL* pQuerySql) {
return TSDB_CODE_SUCCESS;
}
int32_t setMeterID(SSqlObj* pSql, int32_t subClauseIndex, SSQLToken* pzTableName, int32_t tableIndex) {
int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql) {
const char* msg = "name too long";
SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, subClauseIndex, tableIndex);
int32_t code = TSDB_CODE_SUCCESS;
if (hasSpecifyDB(pzTableName)) {
......@@ -1032,7 +1032,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable
const char* msg1 = "invalid column name/illegal column type in arithmetic expression";
const char* msg2 = "functions can not be mixed up";
const char* msg3 = "not support query expression";
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
int32_t outputIndex = pQueryInfo->fieldsInfo.numOfOutputCols;
......@@ -1124,7 +1124,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable
* transfer sql functions that need secondary merge into another format
* in dealing with metric queries such as: count/first/last
*/
tscTansformSQLFunctionForMetricQuery(pQueryInfo);
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForMetricQuery(pQueryInfo)) {
return TSDB_CODE_INVALID_SQL;
......@@ -1215,10 +1215,10 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, colName);
}
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t flag) {
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, pColSchema->bytes,
pColSchema->bytes);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) {
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type,
pColSchema->bytes, pColSchema->bytes);
SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex);
if (TSDB_COL_IS_TAG(flag)) {
......@@ -1489,7 +1489,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
if (optr == TK_DIFF) {
colIdx += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE);
tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
TSDB_KEYSIZE);
SColumnList ids = getColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName);
......@@ -1599,7 +1600,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
for (int32_t i = 0; i < pMeterMetaInfo->pMeterMeta->numOfColumns; ++i) {
SColumnIndex index = {.tableIndex = j, .columnIndex = i};
if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIdx + i + j, &index) != 0) {
if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIdx + i + j, &index) !=
0) {
return TSDB_CODE_INVALID_SQL;
}
}
......@@ -1778,7 +1780,7 @@ static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SSQLToken
return columnIndex;
}
int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex) {
int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
const char* msg0 = "ambiguous column name";
const char* msg1 = "invalid column name";
......@@ -1849,7 +1851,7 @@ int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnInd
return TSDB_CODE_SUCCESS;
}
int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex) {
int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
SSQLToken tableToken = {0};
extractTableNameFromToken(pToken, &tableToken);
......@@ -1860,7 +1862,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIn
return TSDB_CODE_SUCCESS;
}
int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex) {
int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
if (pQueryInfo->pMeterInfo == NULL || pQueryInfo->numOfTables == 0) {
return TSDB_CODE_INVALID_SQL;
}
......@@ -2057,7 +2059,7 @@ bool validateIpAddress(const char* ip, size_t size) {
return ipAddr != INADDR_NONE;
}
int32_t tscTansformSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) {
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if (pMeterMetaInfo->pMeterMeta == NULL || !UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
......@@ -2359,8 +2361,8 @@ static SColumnFilterInfo* addColumnFilterInfo(SColumnBase* pColumn) {
return pColFilterInfo;
}
static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter, SColumnIndex* columnIndex,
tSQLExpr* pExpr) {
static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter,
SColumnIndex* columnIndex, tSQLExpr* pExpr) {
const char* msg = "not supported filter condition";
tSQLExpr* pRight = pExpr->pRight;
......@@ -3147,7 +3149,8 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S
return ret;
}
int32_t getQueryCondExpr(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SCondExpr* pCondExpr, int32_t* type, int32_t parentOptr) {
int32_t getQueryCondExpr(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SCondExpr* pCondExpr, int32_t* type,
int32_t parentOptr) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
......@@ -3276,7 +3279,8 @@ int tableNameCompar(const void* lhs, const void* rhs) {
return ret > 0 ? 1 : -1;
}
static int32_t setTableCondForMetricQuery(SQueryInfo *pQueryInfo, const char* account, tSQLExpr* pExpr, int16_t tableCondIndex, SStringBuilder* sb) {
static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
int16_t tableCondIndex, SStringBuilder* sb) {
const char* msg = "table name too long";
if (pExpr == NULL) {
......@@ -3718,6 +3722,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
tVariantListItem* pItem = &pFillToken->a[0];
const int32_t START_INTERPO_COL_IDX = 1;
const char* msg = "illegal value or data overflow";
const char* msg1 = "value is expected";
const char* msg2 = "invalid fill option";
......@@ -3726,6 +3731,13 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
if (pQueryInfo->defaultVal == NULL) {
pQueryInfo->defaultVal = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(int64_t));
if (pQueryInfo->defaultVal == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
if (strncasecmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_NONE;
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
......@@ -3737,7 +3749,6 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
} else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_PREV;
} else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) {
// not support yet
pQueryInfo->interpoType = TSDB_INTERPO_LINEAR;
} else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) {
pQueryInfo->interpoType = TSDB_INTERPO_SET_VALUE;
......@@ -3767,14 +3778,15 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) {
TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes);
continue;
}
int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type);
if (ret != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg);
}
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes);
}
}
if ((pFillToken->nExpr < pQueryInfo->fieldsInfo.numOfOutputCols) ||
......@@ -3980,11 +3992,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
if (setMeterID(pSql, 0, &(pAlterSQL->name), 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo->name, DEFAULT_TABLE_INDEX);
int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -4087,13 +4099,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char name[128] = {0};
strncpy(name, pVarList->a[0].pVar.pz, pVarList->a[0].pVar.nLen);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_INT, name,
tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_INT, name, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
memset(name, 0, tListLen(name));
strncpy(name, pVarList->a[1].pVar.pz, pVarList->a[1].pVar.nLen);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 1, TSDB_DATA_TYPE_INT, name,
tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 1, TSDB_DATA_TYPE_INT, name, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
const char* msg1 = "invalid tag value";
const char* msg2 = "update normal column not supported";
......@@ -5086,7 +5096,7 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) {
// for debug purpose
void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
if (pCmd == NULL || pQueryInfo->exprsInfo.numOfExprs == 0) {
return;
......@@ -5119,6 +5129,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo;
......@@ -5134,7 +5145,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (setMeterID(pSql, 0, pzTableName, 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -5170,7 +5181,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// two table: the first one is for current table, and the secondary is for the super table.
tscAddEmptyMeterMetaInfo(pQueryInfo);
assert(pQueryInfo->numOfTables == 2);
const int32_t TABLE_INDEX = 0;
const int32_t STABLE_INDEX = 1;
SMeterMetaInfo* pStableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, STABLE_INDEX);
// super table name, create table by using dst
SSQLToken* pToken = &(pCreateTable->usingInfo.stableName);
......@@ -5179,25 +5199,25 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (setMeterID(pSql, 0, pToken, 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// get meter meta from mnode
strncpy(pCreateTable->usingInfo.tagdata.name, pMeterMetaInfo->name, TSDB_METER_ID_LEN);
strncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, TSDB_METER_ID_LEN);
tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals;
int32_t code = tscGetMeterMeta(pSql, pCreateTable->usingInfo.tagdata.name, 0);
int32_t code = tscGetMeterMeta(pSql, pStableMeterMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pMeterMetaInfo->pMeterMeta->numOfTags != pList->nExpr) {
if (pStableMeterMetaInfo->pMeterMeta->numOfTags != pList->nExpr) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
// too long tag values will return invalid sql, not be truncated automatically
SSchema* pTagSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);
SSchema* pTagSchema = tsGetTagSchema(pStableMeterMetaInfo->pMeterMeta);
char* tagVal = pCreateTable->usingInfo.tagdata.data;
for (int32_t i = 0; i < pList->nExpr; ++i) {
......@@ -5220,7 +5240,8 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
int32_t ret = setMeterID(pSql, 0, &pInfo->pCreateTableInfo->name, 0);
SMeterMetaInfo* pTableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, TABLE_INDEX);
int32_t ret = setMeterID(pTableMeterMetaInfo, &pInfo->pCreateTableInfo->name, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -5237,6 +5258,10 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo->numOfTables == 1);
// if (pQueryInfo->numOfTables == 1) {
// tscAddEmptyMeterMetaInfo(pQueryInfo);
// }
SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -5257,11 +5282,11 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
if (setMeterID(pSql, 0, &srcToken, 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
int32_t code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
int32_t code = tscGetMeterMeta(pSql, pMeterMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -5292,7 +5317,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
}
// set the created table[stream] name
if (setMeterID(pSql, 0, pzTableName, 0) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
......@@ -5351,7 +5376,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// too many result columns not support order by in query
if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) {
......@@ -5390,25 +5416,26 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
if (pQueryInfo->numOfTables <= i) { // more than one table
tscAddEmptyMeterMetaInfo(pQueryInfo);
}
SMeterMetaInfo* pMeterInfo1 = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
if (setMeterID(pSql, index, &t, i) != TSDB_CODE_SUCCESS) {
if (setMeterID(pMeterInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
SMeterMetaInfo* pMeterInfo1 = tscGetMeterMetaInfo(pCmd, index, i);
code = tscGetMeterMeta(pSql, pMeterInfo1->name, i);
code = tscGetMeterMeta(pSql, pMeterInfo1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr);
// parse the group by clause in the first place
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
if (parseGroupbyClause(pQueryInfo, pQuerySql->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
}
......@@ -5431,8 +5458,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
// set sliding value
SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) {
// TODO refactor pCmd->count == 1 means sql in stream function
if (!tscEmbedded && pCmd->count == 0) {
if (!tscEmbedded && pCmd->inStream == 0) { // sliding only allowed in stream
const char* msg = "not support sliding in query";
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
......@@ -5476,8 +5502,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
// user does not specified the query time window, twa is not allowed in such case.
if ((pQueryInfo->stime == 0 || pQueryInfo->etime == INT64_MAX ||
(pQueryInfo->etime == INT64_MAX / 1000 &&
pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI)) &&
(pQueryInfo->etime == INT64_MAX / 1000 && pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI)) &&
tscIsTWAQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......@@ -5492,26 +5517,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
if (pQuerySql->fillType != NULL) {
if (pQueryInfo->nAggTimeInterval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (pQueryInfo->nAggTimeInterval > 0) {
int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime);
// number of result is not greater than 10,000,000
if ((timeRange == 0) ||
(timeRange / pQueryInfo->nAggTimeInterval) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
}
int32_t ret = parseFillClause(pQueryInfo, pQuerySql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
}
// in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime);
......@@ -5539,5 +5544,28 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
updateTagColumnIndex(pQueryInfo, i);
}
/*
* fill options are set at the end position, when all columns are set properly
* the columns may be increased due to group by operation
*/
if (pQuerySql->fillType != NULL) {
if (pQueryInfo->nAggTimeInterval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (pQueryInfo->nAggTimeInterval > 0) {
int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime);
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / pQueryInfo->nAggTimeInterval) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
}
int32_t ret = parseFillClause(pQueryInfo, pQuerySql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
}
return TSDB_CODE_SUCCESS; // Does not build query message here
}
......@@ -589,6 +589,8 @@ void destroyAllSelectClause(SSubclauseInfo *pClause) {
SQuerySQL *pQuerySql = pClause->pClause[i];
doDestroyQuerySql(pQuerySql);
}
tfree(pClause->pClause);
}
SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pStableName,
......
......@@ -640,8 +640,8 @@ static int tscLaunchMetricSubQueries(SSqlObj *pSql);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
......@@ -665,7 +665,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
// refactor as one method
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo != NULL);
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
......@@ -710,7 +710,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
}
}
} else {
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
......@@ -758,11 +758,11 @@ int doProcessSql(SSqlObj *pSql) {
int tscProcessSql(SSqlObj *pSql) {
char * name = NULL;
SSqlRes * pRes = &pSql->res;
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo* pMeterMetaInfo = NULL;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = NULL;
int16_t type = 0;
if (pQueryInfo != NULL) {
......@@ -923,7 +923,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
const uint32_t nBufferSize = (1 << 16); // 64KB
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t numOfVnodes = pMeterMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfVnodes > 0);
......@@ -991,8 +991,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
// todo handle multi-vnode situation
if (pQueryInfo->tsBuf) {
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
}
......@@ -1128,7 +1127,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
} else {
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
......@@ -1160,7 +1159,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
}
SSqlRes * pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -1177,7 +1176,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
......@@ -1209,7 +1208,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
#ifdef _DEBUG_VIEW
printf("%ld rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pSchema, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
......@@ -1224,8 +1223,8 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
// each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine
int32_t ret =
tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
pQueryInfo->groupbyExpr.orderType);
if (ret != 0) {
/* set no disk space error info, and abort retry */
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
......@@ -1241,7 +1240,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
trsupport->pState->numOfTotal, trsupport->pState->numOfCompleted);
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, trsupport->pState->numOfTotal, pDesc, trsupport->pFinalColModel,
......@@ -1319,7 +1318,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pQueryInfo->numOfTables == 1);
......@@ -1381,8 +1380,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pMeterInfo[0]->pMeterMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL);
tscProcessSql(pNew);
return;
......@@ -1420,7 +1418,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
*((uint64_t *)pMsg) = pSql->res.qhandle;
pMsg += sizeof(pSql->res.qhandle);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
*((uint16_t *)pMsg) = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type);
......@@ -1451,16 +1449,19 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char * pMsg, *pStart;
int msgLen = 0;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->import = pSql->cmd.import;
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.count); // number of meters to be inserted
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
......@@ -1493,7 +1494,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
*/
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) {
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
......@@ -1570,7 +1571,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
char * pStart = pCmd->payload + tsRpcHeadSize;
......@@ -1667,7 +1668,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
tscError("%p illegal value of number of output columns in query msg: %d", pSql, pQueryInfo->fieldsInfo.numOfOutputCols);
tscError("%p illegal value of number of output columns in query msg: %d", pSql,
pQueryInfo->fieldsInfo.numOfOutputCols);
return -1;
}
......@@ -2214,7 +2216,7 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg);
SCreateTableSQL* pCreateTableInfo = pInfo->pCreateTableInfo;
SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo;
if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) {
size += sizeof(STagData);
} else {
......@@ -2235,9 +2237,9 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema * pSchema;
int size = 0;
SSqlCmd * pCmd = &pSql->cmd;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// Reallocate the payload size
......@@ -2308,7 +2310,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) +
TSDB_EXTRA_PAYLOAD_SIZE;
}
......@@ -2320,7 +2322,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int size = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -2337,7 +2339,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db);
pMsg += sizeof(SMgmtHead);
SAlterTableSQL* pAlterInfo = pInfo->pAlterInfo;
SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo;
pAlterTableMsg = (SAlterTableMsg *)pMsg;
strcpy(pAlterTableMsg->meterId, pMeterMetaInfo->name);
......@@ -2406,7 +2408,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
size_t nameLen = strlen(pMeterMetaInfo->name);
......@@ -2431,7 +2433,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
static int tscSetResultPointer(SQueryInfo* pQueryInfo, SSqlRes *pRes) {
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
if (tscCreateResPointerInfo(pQueryInfo, pRes) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
......@@ -2462,7 +2464,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
pRes->code = TSDB_CODE_SUCCESS;
......@@ -2500,9 +2502,9 @@ int tscProcessDescribeTableRsp(SSqlObj *pSql) {
}
int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t numOfRes = 0;
......@@ -2519,7 +2521,7 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
pRes->code = tscLocalDoReduce(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscSetResultPointer(pQueryInfo, pRes);
......@@ -2577,13 +2579,16 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *tmpData = 0;
if (pSql->cmd.allocSize > 0) {
tmpData = calloc(1, pSql->cmd.allocSize);
if (NULL == tmpData) return -1;
if (NULL == tmpData) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
// STagData is in binary format, strncpy is not available
memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize);
}
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -2597,10 +2602,10 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pInfoMsg = (SMeterInfoMsg *)pMsg;
strcpy(pInfoMsg->meterId, pMeterMetaInfo->name);
pInfoMsg->createFlag = htons((uint16_t)pQueryInfo->defaultVal[0]);
pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
pMsg += sizeof(SMeterInfoMsg);
if (pQueryInfo->defaultVal[0] != 0) {
if (pSql->cmd.createOnDemand) {
memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
pMsg += sizeof(STagData);
}
......@@ -2658,7 +2663,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
const int32_t defaultSize =
minMsgSize() + sizeof(SMetricMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
int32_t n = 0;
for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) {
......@@ -2685,7 +2690,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tableIndex = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STagCond *pTagCond = &pQueryInfo->tagCond;
......@@ -2886,7 +2891,7 @@ int tscProcessRetrieveRspFromMgmt(SSqlObj *pSql) {
pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
tscSetResultPointer(pQueryInfo, pRes);
if (pRes->numOfRows == 0) {
......@@ -3185,7 +3190,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
sizes[k] = pBuf - (char *)pNewMetricMeta;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
for (int32_t i = 0; i < num; ++i) {
char name[TSDB_MAX_TAGS_LEN + 1] = {0};
......@@ -3231,10 +3236,10 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SSchema * pSchema;
char key[20];
SSqlRes * pRes = &pSql->res;
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); //?
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); //?
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -3398,7 +3403,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;
......@@ -3445,26 +3450,25 @@ int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get meter meta", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META;
pNew->cmd.payload = NULL;
pNew->cmd.allocSize = 0;
tscAddSubqueryInfo(&pNew->cmd);
assert(pNew->cmd.numOfClause == 1);
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->defaultVal[0] = pQueryInfo->defaultVal[0]; // flag of create table if not exists
SQueryInfo *pNewQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);
pNew->cmd.createOnDemand = pSql->cmd.createOnDemand; // create table if not exists
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p malloc failed for payload to get meter meta", pSql);
free(pNew);
......@@ -3472,10 +3476,11 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
SMeterMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
strcpy(pMeterMetaInfo->name, meterId);
memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);
strcpy(pNewMeterMetaInfo->name, pMeterMetaInfo->name);
memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE); // tag information if table does not exists.
tscTrace("%p new pSqlObj:%p to get meterMeta", pSql, pNew);
if (pSql->fp == NULL) {
......@@ -3483,14 +3488,19 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
tsem_init(&pNew->emptyRspSem, 0, 1);
code = tscProcessSql(pNew);
SMeterMetaInfo *pInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, index);
// update cache only on success get metermeta
/*
* Update cache only on succeeding in getting metermeta.
* Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
*/
if (code == TSDB_CODE_SUCCESS) {
pInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, meterId);
pMeterMetaInfo->pMeterMeta = pNewMeterMetaInfo->pMeterMeta;
pNewMeterMetaInfo->pMeterMeta = NULL;
assert(pMeterMetaInfo->pMeterMeta != NULL);
}
tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pInfo->pMeterMeta);
tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pMeterMetaInfo->pMeterMeta);
tscFreeSqlObj(pNew);
} else {
......@@ -3507,14 +3517,12 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
return code;
}
int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
SSqlCmd *pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, index);
int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
assert(strlen(pMeterMetaInfo->name) != 0);
// if the SSqlCmd owns a metermeta, release it first
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, meterId);
pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMetaInfo->pMeterMeta != NULL) {
SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
......@@ -3529,18 +3537,12 @@ int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
* for async insert operation, release data block buffer before issue new object to get metermeta
* because in metermeta callback function, the tscParse function will generate the submit data blocks
*/
// if (pSql->fp != NULL && pSql->pStream == NULL) {
// tscFreeSqlCmdData(pCmd);
//}
return tscDoGetMeterMeta(pSql, meterId, index);
return doGetMeterMetaFromServer(pSql, pMeterMetaInfo);
}
int tscGetMeterMetaEx(SSqlObj *pSql, char *meterId, bool createIfNotExists) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pQueryInfo->defaultVal[0] = createIfNotExists ? 1 : 0;
return tscGetMeterMeta(pSql, meterId, 0);
int tscGetMeterMetaEx(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo, bool createIfNotExists) {
pSql->cmd.createOnDemand = createIfNotExists;
return tscGetMeterMeta(pSql, pMeterMetaInfo);
}
/*
......@@ -3568,7 +3570,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
// handle metric meta renew process
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// enforce the renew metermeta operation in async model
......@@ -3583,10 +3585,11 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta);
}
tscWaitingForCreateTable(pCmd);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
code = tscDoGetMeterMeta(pSql, meterId, 0); // todo ??
code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ??
} else {
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid,
......@@ -3612,7 +3615,7 @@ int tscGetMetricMeta(SSqlObj *pSql) {
bool reqMetricMeta = false;
int32_t subClauseIndex = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};
......@@ -3641,11 +3644,14 @@ int tscGetMetricMeta(SSqlObj *pSql) {
pNew->cmd.command = TSDB_SQL_METRIC;
SQueryInfo *pNewQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
tscAddMeterMetaInfo(pQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
}
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
......@@ -3653,8 +3659,6 @@ int tscGetMetricMeta(SSqlObj *pSql) {
return code;
}
// the query condition on meter is serialized into payload
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
......
......@@ -298,8 +298,11 @@ int taos_num_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SFieldInfo *pFieldsInfo = &pQueryInfo->fieldsInfo;
if (pQueryInfo == NULL) {
return 0;
}
SFieldInfo *pFieldsInfo = &pQueryInfo->fieldsInfo;
return (pFieldsInfo->numOfOutputCols - pFieldsInfo->numOfHiddenCols);
}
......@@ -993,11 +996,8 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int code = TSDB_CODE_INVALID_METER_ID;
char *str = (char *)tblNameList;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo == NULL) {
tscAddSubqueryInfo(pCmd);
pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
}
SQueryInfo* pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
......@@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return code;
}
if ((code = setMeterID(pSql, 0, &sToken, 0)) != TSDB_CODE_SUCCESS) {
if ((code = setMeterID(pMeterMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -70,7 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
int code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pSql->res.code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -82,7 +82,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
}
tscTansformSQLFunctionForMetricQuery(pQueryInfo);
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
// failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) {
......@@ -391,6 +391,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
pStream->slidingTime = pQueryInfo->nSlidingTime;
pQueryInfo->nAggTimeInterval = 0; // clear the interval value to avoid the force time window split by query processor
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
......@@ -507,8 +508,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL;
}
// TODO later refactor use enum
pSql->cmd.count = 1; // 1 means sql in stream, allowed the sliding clause.
pSql->cmd.inStream = 1; // 1 means sql in stream, allowed the sliding clause.
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
......
......@@ -319,7 +319,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
}
pQueryInfo->interpoType = TSDB_INTERPO_NONE;
memset(pQueryInfo->defaultVal, 0, sizeof(pQueryInfo->defaultVal));
tfree(pQueryInfo->defaultVal);
}
void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) {
......@@ -398,20 +398,20 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
tfree(pSql->sqlstr);
pthread_mutex_unlock(&pObj->mutex);
tfree(pSql->res.pRsp);
pSql->res.row = 0;
pSql->res.numOfRows = 0;
pSql->res.numOfTotal = 0;
tfree(pRes->pRsp);
pRes->row = 0;
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
pSql->res.numOfGroups = 0;
tfree(pSql->res.pGroupRec);
pRes->numOfGroups = 0;
tfree(pRes->pGroupRec);
tscDestroyLocalReducer(pSql);
tfree(pSql->pSubs);
pSql->numOfSubs = 0;
tscDestroyResPointerInfo(pRes);
tfree(pSql->res.pColumnIndex);
tfree(pRes->pColumnIndex);
tscFreeSqlCmdData(pCmd);
}
......@@ -535,7 +535,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
SSqlCmd* pCmd = &pSql->cmd;
assert(pDataBlock->pMeterMeta != NULL);
pCmd->count = pDataBlock->numOfMeters;
pCmd->numOfTablesInSubmit = pDataBlock->numOfMeters;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
......@@ -1548,7 +1548,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
tscTrace("%p object should be release since all data blocks have been submit", pSql);
......@@ -1664,6 +1664,8 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
tfree(pQueryInfo->defaultVal);
}
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
......@@ -1814,15 +1816,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo));
pNewQueryInfo->pMeterInfo = NULL;
memset(&pNewQueryInfo->colList, 0, sizeof(pNewQueryInfo->colList));
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
pNewQueryInfo->pMeterInfo = NULL;
pNewQueryInfo->defaultVal = NULL;
pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->tsBuf = NULL;
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex);
......@@ -1910,8 +1915,8 @@ void tscDoQuery(SSqlObj* pSql) {
tscAddIntoSqlList(pSql);
}
if (pCmd->isInsertFromFile == 1) {
tscProcessMultiVnodesInsertForFile(pSql);
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscProcessMultiVnodesInsertFromFile(pSql);
} else {
// pSql may be released in this function if it is a async insertion.
tscProcessSql(pSql);
......
......@@ -208,7 +208,7 @@ extern "C" {
#define TSDB_MAX_RPC_THREADS 5
#define TSDB_QUERY_TYPE_QUERY 0 // normal query
#define TSDB_QUERY_TYPE_NON_TYPE 0x00U // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01U // free qhandle at vnode
/*
......@@ -224,6 +224,13 @@ extern "C" {
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40U // select *,columns... query
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80U // join sub query at the second stage
#define TSDB_QUERY_TYPE_INSERT 0x100U // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200U // import data
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSQL_SO_ASC 1
#define TSQL_SO_DESC 0
......
......@@ -585,6 +585,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
SShellSubmitMsg *pSubmit = &shellSubmit;
SShellSubmitBlock *pBlocks = NULL;
pSubmit->import = htons(pSubmit->import);
pSubmit->vnode = htons(pSubmit->vnode);
pSubmit->numOfSid = htonl(pSubmit->numOfSid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册