diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 77e632fdab50839d220a433d19247fce6f75a5bb..687b5bc689507c22dbeac6f81644f2cca260394c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -110,7 +110,7 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex); -int32_t setMeterID(SSqlObj* pSql, int32_t subClauseIndex, SSQLToken* pzTableName, int32_t tableIndex); +int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql); void tscClearInterpInfo(SQueryInfo* pQueryInfo); bool tscIsInsertOrImportData(char* sqlstr); @@ -198,8 +198,8 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid); int tscGetMetricMeta(SSqlObj* pSql); -int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex); -int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists); +int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo); +int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists); void tscResetForNextRetrieve(SSqlRes* pRes); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4d2847aca6fbc5010ef5adc463fb6c3630180d30..f867ae72a1b4d9b7671f0a9192eb4e45d1ae363a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -55,17 +55,17 @@ typedef struct SSqlGroupbyExpr { } SSqlGroupbyExpr; typedef struct SMeterMetaInfo { - SMeterMeta * pMeterMeta; // metermeta - SMetricMeta *pMetricMeta; // metricmeta - + SMeterMeta * pMeterMeta; // metermeta + SMetricMeta *pMetricMeta; // metricmeta + /* * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion */ - int32_t vnodeIndex; - char name[TSDB_METER_ID_LEN + 1]; // table(super table) name - int16_t numOfTags; // total required tags in query, including groupby tags - int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection + int32_t vnodeIndex; + char name[TSDB_METER_ID_LEN + 1]; // table(super table) name + int16_t numOfTags; // total required tags in query, including groupby tags + int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection } SMeterMetaInfo; /* the structure for sql function in select clause */ @@ -170,23 +170,23 @@ typedef struct SParamInfo { } SParamInfo; typedef struct STableDataBlocks { - char meterId[TSDB_METER_ID_LEN]; - int8_t tsSource; // where does the UNIX timestamp come from, server or client - bool ordered; // if current rows are ordered or not - int64_t vgid; // virtual group id - int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending - int32_t numOfMeters; // number of tables in current submit block - - int32_t rowSize; // row size for current table + char meterId[TSDB_METER_ID_LEN]; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgid; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfMeters; // number of tables in current submit block + + int32_t rowSize; // row size for current table uint32_t nAllocSize; uint32_t size; - + /* * the metermeta for current table, the metermeta will be used during submit stage, keep a ref * to avoid it to be removed from cache */ - SMeterMeta* pMeterMeta; - + SMeterMeta *pMeterMeta; + union { char *filename; char *pData; @@ -208,79 +208,61 @@ typedef struct SDataBlockList { } SDataBlockList; typedef struct SQueryInfo { - uint16_t type; // query type - char intervalTimeUnit; - + uint16_t type; // query/insert/import type + char intervalTimeUnit; + int64_t etime, stime; int64_t nAggTimeInterval; // aggregation time interval int64_t nSlidingTime; // sliding window in mseconds SSqlGroupbyExpr groupbyExpr; // group by tags info - - SColumnBaseInfo colList; - SFieldInfo fieldsInfo; - SSqlExprInfo exprsInfo; - SLimitVal limit; - SLimitVal slimit; - STagCond tagCond; - SOrderVal order; - int16_t interpoType; // interpolate type - int16_t numOfTables; + + SColumnBaseInfo colList; + SFieldInfo fieldsInfo; + SSqlExprInfo exprsInfo; + SLimitVal limit; + SLimitVal slimit; + STagCond tagCond; + SOrderVal order; + int16_t interpoType; // interpolate type + int16_t numOfTables; SMeterMetaInfo **pMeterInfo; struct STSBuf * tsBuf; - // todo use dynamic allocated memory for defaultVal - int64_t defaultVal[TSDB_MAX_COLUMNS]; // default value for interpolation - char* msg; // pointer to the pCmd->payload to keep error message temporarily + int64_t * defaultVal; // default value for interpolation + char * msg; // pointer to the pCmd->payload to keep error message temporarily } SQueryInfo; +// data source from sql string or from file +enum { + DATA_FROM_SQL_STRING = 1, + DATA_FROM_DATA_FILE = 2, +}; + typedef struct { -// SOrderVal order; - int command; - int count; // TODO refactor + int command; + uint8_t msgType; union { - bool existsCheck; // check if the table exists - bool import; // import/insert type + bool existsCheck; // check if the table exists or not + bool inStream; // denote if current sql is executed in stream or not + bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta + int8_t dataSourceType; // load data from file or not }; - int8_t isInsertFromFile; // load data from file or not - uint8_t msgType; + union { + int32_t count; + int32_t numOfTablesInSubmit; + }; - /* - * use to keep short request msg and error msg, in such case, SSqlCmd->payload == SSqlCmd->ext; - * create table/query/insert operations will exceed the TSDB_SQLCMD_SIZE. - * - * In such cases, allocate the memory dynamically, and need to free the memory - */ - uint32_t allocSize; - char * payload; - int payloadLen; - short numOfCols; - int64_t globalLimit; - - SQueryInfo **pQueryInfo; - int32_t numOfClause; - -// char intervalTimeUnit; -// int64_t etime, stime; -// int64_t nAggTimeInterval; // aggregation time interval -// int64_t nSlidingTime; // sliding window in mseconds -// SSqlGroupbyExpr groupbyExpr; // group by tags info -// -// SColumnBaseInfo colList; -// SFieldInfo fieldsInfo; -// SSqlExprInfo exprsInfo; -// SLimitVal limit; -// SLimitVal slimit; -// STagCond tagCond; -// int16_t interpoType; // interpolate type -// int16_t numOfTables; -// SMeterMetaInfo **pMeterInfo; -// struct STSBuf * tsBuf; -// // todo use dynamic allocated memory for defaultVal -// int64_t defaultVal[TSDB_MAX_COLUMNS]; // default value for interpolation + short numOfCols; + uint32_t allocSize; + char * payload; + int payloadLen; + int64_t globalLimit; + SQueryInfo **pQueryInfo; + int32_t numOfClause; // submit data blocks branched according to vnode - SDataBlockList * pDataBlocks; + SDataBlockList *pDataBlocks; // for parameter ('?') binding and batch processing int32_t batchSize; @@ -359,8 +341,8 @@ typedef struct _sql_obj { SSqlCmd cmd; SSqlRes res; uint8_t numOfSubs; - char* asyncTblPos; - void* pTableHashList; + char * asyncTblPos; + void * pTableHashList; struct _sql_obj **pSubs; struct _sql_obj * prev, *next; } SSqlObj; @@ -402,7 +384,7 @@ typedef struct { // tscSql API int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); -void tscInitMsgs(); +void tscInitMsgs(); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle); @@ -423,12 +405,12 @@ int taos_retrieve(TAOS_RES *res); * transfer function for metric query in stream computing, the function need to be change * before send query message to vnode */ -int32_t tscTansformSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo); -void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo); +int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo); +void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo); void tscClearSqlMetaInfoForce(SSqlCmd *pCmd); -int32_t tscCreateResPointerInfo(SQueryInfo* pQueryInfo, SSqlRes *pRes); +int32_t tscCreateResPointerInfo(SQueryInfo *pQueryInfo, SSqlRes *pRes); void tscDestroyResPointerInfo(SSqlRes *pRes); void tscFreeSqlCmdData(SSqlCmd *pCmd); @@ -449,14 +431,14 @@ void tscFreeSqlObj(SSqlObj *pObj); void tscCloseTscObj(STscObj *pObj); -void tscProcessMultiVnodesInsert(SSqlObj *pSql); -void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql); -void tscKillMetricQuery(SSqlObj *pSql); -void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); -bool tscIsUpdateQuery(STscObj *pObj); -bool tscHasReachLimitation(SSqlObj* pSql); +void tscProcessMultiVnodesInsert(SSqlObj *pSql); +void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); +void tscKillMetricQuery(SSqlObj *pSql); +void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); +bool tscIsUpdateQuery(STscObj *pObj); +bool tscHasReachLimitation(SSqlObj *pSql); -char* tscGetErrorMsgPayload(SSqlCmd* pCmd); +char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 2a2775f926ea10584375d6147a19641f1f682cad..d84ad043654d6f06ab04f29d4f56bf031efd6bbd 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -413,13 +413,13 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) SSqlCmd *pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; - assert(!pCmd->isInsertFromFile && pSql->signature == pSql); + assert(pCmd->dataSourceType != 0 && pSql->signature == pSql); int32_t index = 0; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index); - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index, 0); - assert(pQueryInfo->numOfTables == 1); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2); SDataBlockList *pDataBlocks = pCmd->pDataBlocks; if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { @@ -456,7 +456,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { SSqlObj *pSql = (SSqlObj *)param; if (pSql == NULL || pSql->signature != pSql) return; - STscObj *pObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -480,7 +479,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); assert(pMeterMetaInfo->pMeterMeta == NULL); - tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + tscGetMeterMeta(pSql, pMeterMetaInfo); code = tscSendMsgToServer(pSql); if (code != 0) { pRes->code = code; @@ -513,7 +512,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { tscTrace("%p get metricMeta during metric query successfully", pSql); - code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + code = tscGetMeterMeta(pSql, pMeterMetaInfo); pRes->code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; @@ -529,12 +528,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { } else { // stream computing SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); - code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + code = tscGetMeterMeta(pSql, pMeterMetaInfo); pRes->code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { code = tscGetMetricMeta(pSql); pRes->code = code; @@ -557,7 +555,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { */ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - tscTansformSQLFunctionForMetricQuery(pQueryInfo); + tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscIncStreamExecutionCount(pSql->pStream); } else { tscTrace("%p get meterMeta/metricMeta successfully", pSql); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index fd91a477421323f30c7258dc9fcd06a263a97863..4a6c158ee37936217a71ee1bbe8e494ca85fcf40 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -18,9 +18,8 @@ #define _XOPEN_SOURCE -#include -#include "os.h" #include "ihash.h" +#include "os.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tschemautil.h" @@ -72,8 +71,6 @@ static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) { } int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) { - //char * token; //fang not used - //int tokenlen; //fang not used int32_t index = 0; SSQLToken sToken; int64_t interval; @@ -116,13 +113,12 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1 index = 0; sToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL); pTokenEnd += index; - + if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) { - index = 0; valueToken = tStrGetToken(pTokenEnd, &index, false, 0, NULL); pTokenEnd += index; - + if (valueToken.n < 2) { return tscInvalidSQLErrMsg(error, "value expected in timestamp", sToken.z); } @@ -130,7 +126,7 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1 if (getTimestampInUsFromStr(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - + if (timePrec == TSDB_TIME_PRECISION_MILLI) { interval /= 1000; } @@ -153,8 +149,8 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, int64_t iv; int32_t numType; char * endptr = NULL; - errno = 0; // clear the previous existed error information - + errno = 0; // clear the previous existed error information + switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { // bool if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { @@ -194,7 +190,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z); } - *((int8_t *)payload) = (int8_t) iv; + *((int8_t *)payload) = (int8_t)iv; } break; @@ -307,11 +303,11 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, // binary data cannot be null-terminated char string, otherwise the last char of the string is lost if (pToken->type == TK_NULL) { *payload = TSDB_DATA_BINARY_NULL; - } else { // too long values will return invalid sql, not be truncated automatically + } else { // too long values will return invalid sql, not be truncated automatically if (pToken->n > pSchema->bytes) { return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z); } - + strncpy(payload, pToken->z, pToken->n); } @@ -325,7 +321,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) { char buf[512] = {0}; snprintf(buf, 512, "%s", strerror(errno)); - + return tscInvalidSQLErrMsg(msg, buf, pToken->z); } } @@ -343,7 +339,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) { return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z); } - + *((int64_t *)payload) = temp; } @@ -375,7 +371,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start } else { if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) { return -1; // client time/server time can not be mixed - + } else if (pDataBlocks->tsSource == -1) { pDataBlocks->tsSource = TSDB_USE_CLI_TS; } @@ -390,9 +386,9 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start } int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error, - int16_t timePrec, int32_t *code, char* tmpTokenBuf) { - int32_t index = 0; - //bool isPrevOptr; //fang, never used + int16_t timePrec, int32_t *code, char *tmpTokenBuf) { + int32_t index = 0; + // bool isPrevOptr; //fang, never used SSQLToken sToken = {0}; char * payload = pDataBlocks->pData + pDataBlocks->size; @@ -400,8 +396,8 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ int32_t rowSize = 0; for (int i = 0; i < spd->numOfAssignedCols; ++i) { // the start position in data block buffer of current value in sql - char * start = payload + spd->elems[i].offset; - int16_t colIndex = spd->elems[i].colIndex; + char * start = payload + spd->elems[i].offset; + int16_t colIndex = spd->elems[i].colIndex; SSchema *pSchema = schema + colIndex; rowSize += pSchema->bytes; @@ -414,7 +410,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { continue; } - + strcpy(error, "client out of memory"); *code = TSDB_CODE_CLI_OUT_OF_MEMORY; return -1; @@ -431,10 +427,10 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ // Remove quotation marks if (TK_STRING == sToken.type) { // delete escape character: \\, \', \" - char delim = sToken.z[0]; + char delim = sToken.z[0]; int32_t cnt = 0; int32_t j = 0; - for (int32_t k = 1; k < sToken.n - 1; ++k) { + for (int32_t k = 1; k < sToken.n - 1; ++k) { if (sToken.z[k] == delim || sToken.z[k] == '\\') { if (sToken.z[k + 1] == delim) { cnt++; @@ -444,13 +440,13 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ continue; } } - + tmpTokenBuf[j] = sToken.z[k]; j++; } - tmpTokenBuf[j] = 0; + tmpTokenBuf[j] = 0; sToken.z = tmpTokenBuf; - sToken.n -= 2 + cnt; + sToken.n -= 2 + cnt; } bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); @@ -472,7 +468,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ char *ptr = payload; for (int32_t i = 0; i < spd->numOfCols; ++i) { - if (!spd->hasVal[i]) { // current column do not have any value to insert, set it to null + if (!spd->hasVal[i]) { // current column do not have any value to insert, set it to null setNull(ptr, schema[i].type, schema[i].bytes); } @@ -497,7 +493,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { } int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMeta, int maxRows, - SParsedDataColInfo *spd, char *error, int32_t *code, char* tmpTokenBuf) { + SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) { int32_t index = 0; SSQLToken sToken; @@ -520,17 +516,17 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe *str += index; if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) { int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize); - if (0 == tSize) { //TODO pass the correct error code to client + if (0 == tSize) { // TODO pass the correct error code to client strcpy(error, "client out of memory"); *code = TSDB_CODE_CLI_OUT_OF_MEMORY; return -1; } - + maxRows += tSize; } int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf); - if (len <= 0) { // error message has been set in tsParseOneRowData + if (len <= 0) { // error message has been set in tsParseOneRowData return -1; } @@ -574,7 +570,7 @@ static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) { size_t remain = pDataBlock->nAllocSize - pDataBlock->size; const int factor = 5; - uint32_t nAllocSizeOld = pDataBlock->nAllocSize; + uint32_t nAllocSizeOld = pDataBlock->nAllocSize; // expand the allocated size if (remain < rowSize * factor) { @@ -588,7 +584,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) { pDataBlock->pData = tmp; memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); } else { - //assert(false); + // assert(false); // do nothing pDataBlock->nAllocSize = nAllocSizeOld; return 0; @@ -656,18 +652,18 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char STableDataBlocks *dataBuf = NULL; int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, &dataBuf); + sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { return ret; } - + int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize); if (0 == maxNumOfRows) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - + int32_t code = TSDB_CODE_INVALID_SQL; - char* tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \" + char * tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \" if (NULL == tmpTokenBuf) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } @@ -679,7 +675,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char } for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) { - SParamInfo* param = dataBuf->params + i; + SParamInfo *param = dataBuf->params + i; if (param->idx == -1) { param->idx = pCmd->numOfParams++; param->offset -= sizeof(SShellSubmitBlock); @@ -700,16 +696,20 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char return TSDB_CODE_SUCCESS; } -static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { +static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { int32_t index = 0; - SSQLToken sToken; - SSQLToken tableToken; + SSQLToken sToken = {0}; + SSQLToken tableToken = {0}; int32_t code = TSDB_CODE_SUCCESS; - - SSqlCmd * pCmd = &pSql->cmd; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + + const int32_t TABLE_INDEX = 0; + const int32_t STABLE_INDEX = 1; + + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); char *sql = *sqlstr; + // get the token of specified table index = 0; tableToken = tStrGetToken(sql, &index, false, 0, NULL); @@ -746,41 +746,54 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { if (numOfColList == 0 && cstart != NULL) { return TSDB_CODE_INVALID_SQL; } - - if (sToken.type == TK_USING) { // create table if not exists + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, TABLE_INDEX); + + if (sToken.type == TK_USING) { // create table if not exists according to the super table index = 0; sToken = tStrGetToken(sql, &index, false, 0, NULL); sql += index; STagData *pTag = (STagData *)pCmd->payload; memset(pTag, 0, sizeof(STagData)); - setMeterID(pSql, 0, &sToken, 0); + + /* + * the source super table is moved to the secondary position of the pMeterMetaInfo list + */ + if (pQueryInfo->numOfTables < 2) { + tscAddEmptyMeterMetaInfo(pQueryInfo); + } - strncpy(pTag->name, pMeterMetaInfo->name, TSDB_METER_ID_LEN); - code = tscGetMeterMeta(pSql, pTag->name, 0); + SMeterMetaInfo *pSTableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, STABLE_INDEX); + setMeterID(pSTableMeterMetaInfo, &sToken, pSql); + + strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_METER_ID_LEN); + code = tscGetMeterMeta(pSql, pSTableMeterMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { + if (!UTIL_METER_IS_SUPERTABLE(pSTableMeterMetaInfo)) { return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z); } - SSchema *pTagSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); + SSchema *pTagSchema = tsGetTagSchema(pSTableMeterMetaInfo->pMeterMeta); index = 0; sToken = tStrGetToken(sql, &index, false, 0, NULL); sql += index; - SParsedDataColInfo spd = {0}; - uint8_t numOfTags = pMeterMetaInfo->pMeterMeta->numOfTags; + SParsedDataColInfo spd = {0}; + + uint8_t numOfTags = pSTableMeterMetaInfo->pMeterMeta->numOfTags; spd.numOfCols = numOfTags; // if specify some tags column if (sToken.type != TK_LP) { tscSetAssignedColumnInfo(&spd, pTagSchema, numOfTags); } else { - /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen) tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */ + /* insert into tablename (col1, col2,..., coln) using superTableName (tagName1, tagName2, ..., tagNamen) + * tags(tagVal1, tagVal2, ..., tagValn) values(v1, v2,... vn); */ int16_t offset[TSDB_MAX_COLUMNS] = {0}; for (int32_t t = 1; t < numOfTags; ++t) { offset[t] = offset[t - 1] + pTagSchema[t - 1].bytes; @@ -807,14 +820,14 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { for (int32_t t = 0; t < numOfTags; ++t) { if (strncmp(sToken.z, pTagSchema[t].name, sToken.n) == 0 && strlen(pTagSchema[t].name) == sToken.n) { SParsedColElem *pElem = &spd.elems[spd.numOfAssignedCols++]; - pElem->offset = offset[t]; + pElem->offset = offset[t]; pElem->colIndex = t; if (spd.hasVal[t] == true) { return tscInvalidSQLErrMsg(pCmd->payload, "duplicated tag name", sToken.z); } - spd.hasVal[t] = true; + spd.hasVal[t] = true; findColumnIndex = true; break; } @@ -833,7 +846,7 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { sToken = tStrGetToken(sql, &index, false, 0, NULL); sql += index; } - + if (sToken.type != TK_TAGS) { return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z); } @@ -841,9 +854,9 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { uint32_t ignoreTokenTypes = TK_LP; uint32_t numOfIgnoreToken = 1; for (int i = 0; i < spd.numOfAssignedCols; ++i) { - char* tagVal = pTag->data + spd.elems[i].offset; + char * tagVal = pTag->data + spd.elems[i].offset; int16_t colIndex = spd.elems[i].colIndex; - + index = 0; sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes); sql += index; @@ -859,13 +872,14 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { sToken.n -= 2; } - code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, pMeterMetaInfo->pMeterMeta->precision); + code = tsParseOneColumnData(&pTagSchema[colIndex], &sToken, tagVal, pCmd->payload, &sql, false, + pSTableMeterMetaInfo->pMeterMeta->precision); if (code != TSDB_CODE_SUCCESS) { return code; } - if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY || - pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) && sToken.n > pTagSchema[colIndex].bytes) { + if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY || pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) && + sToken.n > pTagSchema[colIndex].bytes) { return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z); } } @@ -880,34 +894,34 @@ static int32_t tscParseSqlForCreateTableOnDemand(char **sqlstr, SSqlObj *pSql) { // 2. set the null value for the columns that do not assign values if (spd.numOfAssignedCols < spd.numOfCols) { char *ptr = pTag->data; - + for (int32_t i = 0; i < spd.numOfCols; ++i) { - if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null + if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes); } - + ptr += pTagSchema[i].bytes; - } + } } if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) { return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr); } - int32_t ret = setMeterID(pSql, 0, &tableToken, 0); + int32_t ret = setMeterID(pMeterMetaInfo, &tableToken, pSql); if (ret != TSDB_CODE_SUCCESS) { return ret; } createTable = true; - code = tscGetMeterMetaEx(pSql, pMeterMetaInfo->name, true); + code = tscGetMeterMetaEx(pSql, pMeterMetaInfo, true); } else { if (cstart != NULL) { sql = cstart; } else { sql = sToken.z; } - code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + code = tscGetMeterMeta(pSql, pMeterMetaInfo); } int32_t len = cend - cstart + 1; @@ -932,6 +946,15 @@ int validateTableName(char *tblName, int len) { return tscValidateName(&token); } +static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) { + if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) { + return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql); + } + + pCmd->dataSourceType = type; + return TSDB_CODE_SUCCESS; +} + /** * usage: insert into table1 values() () table2 values()() * @@ -943,18 +966,20 @@ int validateTableName(char *tblName, int len) { */ int doParseInsertSql(SSqlObj *pSql, char *str) { SSqlCmd *pCmd = &pSql->cmd; - + int32_t totalNum = 0; - SQueryInfo* pQueryInfo = NULL; - SMeterMetaInfo* pMeterMetaInfo = NULL; - - int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); - + int32_t code = TSDB_CODE_SUCCESS; + + SMeterMetaInfo *pMeterMetaInfo = NULL; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + assert(pQueryInfo != NULL); + if (pQueryInfo->numOfTables == 0) { pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); } else { pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); +// assert(pQueryInfo->numOfTables == 1); } if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { @@ -962,8 +987,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { } if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) { - pSql->pTableHashList = taosInitHashTable(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); - + pSql->pTableHashList = taosInitHashTable(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) { code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -972,22 +997,32 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { } else { str = pSql->asyncTblPos; } - + tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks); while (1) { - int32_t index = 0; + int32_t index = 0; SSQLToken sToken = tStrGetToken(str, &index, false, 0, NULL); - if (sToken.n == 0) { // parse file, do not release the STableDataBlock - if (pCmd->isInsertFromFile == 1) { + + // no data in the sql string anymore. + if (sToken.n == 0) { + /* + * if the data is from the data file, no data has been generated yet. So, there no data to + * merge or submit, save the file path and parse the file in other routines. + */ + if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { goto _clean; } - if (totalNum > 0) { - break; - } else { // no data in current sql string, error + /* + * if no data has been generated during parsing the sql string, error msg will return + * Otherwise, create the first submit block and submit to virtual node. + */ + if (totalNum == 0) { code = TSDB_CODE_INVALID_SQL; goto _error_clean; + } else { + break; } } @@ -999,22 +1034,21 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } - //TODO refactor - if ((code = setMeterID(pSql, 0, &sToken, 0)) != TSDB_CODE_SUCCESS) { + if ((code = setMeterID(pMeterMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) { goto _error_clean; } void *fp = pSql->fp; - if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) { + if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { + /* + * For async insert, after get the metermeta from server, the sql string will not be + * parsed using the new metermeta to avoid the overhead cause by get metermeta data information. + * And during the getMeterMetaCallback function, the sql string will be parsed from the + * interrupted position. + */ if (fp != NULL) { - //goto _clean; return code; } else { - /* - * for async insert, the free data block operations, which is tscDestroyBlockArrayList, - * must be executed before launch another threads to get metermeta, since the - * later ops may manipulate SSqlObj through another thread in getMeterMetaCallback function. - */ goto _error_clean; } } @@ -1027,8 +1061,9 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { index = 0; sToken = tStrGetToken(str, &index, false, 0, NULL); str += index; + if (sToken.n == 0) { - code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); + code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); goto _error_clean; } @@ -1038,13 +1073,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns); - if (pCmd->isInsertFromFile == -1) { - pCmd->isInsertFromFile = 0; - } else { - if (pCmd->isInsertFromFile == 1) { - code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z); - goto _error_clean; - } + if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { + goto _error_clean; } /* @@ -1056,13 +1086,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } } else if (sToken.type == TK_FILE) { - if (pCmd->isInsertFromFile == -1) { - pCmd->isInsertFromFile = 1; - } else { - if (pCmd->isInsertFromFile == 0) { - code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z); - goto _error_clean; - } + if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { + goto _error_clean; } index = 0; @@ -1091,7 +1116,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (ret != TSDB_CODE_SUCCESS) { goto _error_clean; } - + tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); strcpy(pDataBlock->filename, fname); } else if (sToken.type == TK_LP) { @@ -1099,10 +1124,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, 0, 0)->pMeterMeta; SSchema * pSchema = tsGetSchema(pMeterMeta); - if (pCmd->isInsertFromFile == -1) { - pCmd->isInsertFromFile = 0; - } else if (pCmd->isInsertFromFile == 1) { - code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sToken.z); + if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { goto _error_clean; } @@ -1183,7 +1205,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (pCmd->numOfParams > 0) { goto _clean; } - + // submit to more than one vnode if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgid @@ -1197,7 +1219,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { } pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); - + // set the next sent data vnode index in data block arraylist pMeterMetaInfo->vnodeIndex = 1; } else { @@ -1223,24 +1245,29 @@ int tsParseInsertSql(SSqlObj *pSql) { int32_t index = 0; SSqlCmd *pCmd = &pSql->cmd; - char* sql = pSql->sqlstr; - - SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL); - + + SSQLToken sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); assert(sToken.type == TK_INSERT || sToken.type == TK_IMPORT); - pCmd->import = (sToken.type == TK_IMPORT); - - sToken = tStrGetToken(sql, &index, false, 0, NULL); + + pCmd->count = 0; + pCmd->command = TSDB_SQL_INSERT; + + SQueryInfo *pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); + + if (sToken.type == TK_INSERT) { + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); + } else { + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_IMPORT); + } + + sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); if (sToken.type != TK_INTO) { return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z); } - - pCmd->count = 0; - pCmd->command = TSDB_SQL_INSERT; - pCmd->isInsertFromFile = -1; + pSql->res.numOfRows = 0; - - return doParseInsertSql(pSql, sql + index); + return doParseInsertSql(pSql, pSql->sqlstr + index); } int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { @@ -1270,7 +1297,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { } else { ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); if (TSDB_CODE_SUCCESS != ret) return ret; - + SSqlInfo SQLInfo = {0}; tSQLParse(&SQLInfo, pSql->sqlstr); @@ -1331,12 +1358,12 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { pCmd->pDataBlocks = tscCreateBlockArrayList(); STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), + int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name, &pTableDataBlock); if (ret != TSDB_CODE_SUCCESS) { return -1; } - + tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); maxRows = tscAllocateMemIfNeed(pTableDataBlock, rowSize); @@ -1351,7 +1378,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { while ((readLen = getline(&line, &n, fp)) != -1) { // line[--readLen] = '\0'; if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0; - if (readLen == 0) continue; //fang, <= to == + if (readLen == 0) continue; // fang, <= to == char *lineptr = line; strtolower(line, line); @@ -1359,15 +1386,16 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { if (numOfRows >= maxRows || pTableDataBlock->size + pMeterMeta->rowSize >= pTableDataBlock->nAllocSize) { uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, pMeterMeta->rowSize); if (0 == tSize) return (-TSDB_CODE_CLI_OUT_OF_MEMORY); - maxRows += tSize; + maxRows += tSize; } - len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, tmpTokenBuf); + len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, + tmpTokenBuf); if (len <= 0 || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; return (-code); } - + pTableDataBlock->size += len; count++; @@ -1425,7 +1453,7 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) { int32_t code = TSDB_CODE_SUCCESS; /* the first block has been sent to server in processSQL function */ - assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL); + assert(pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL); if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) { SDataBlockList *pDataBlocks = pCmd->pDataBlocks; @@ -1437,7 +1465,8 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) { } if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { - tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize); + tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, + pDataBlocks->nSize); continue; } @@ -1450,17 +1479,19 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) { } // multi-vnodes insertion in sync query model -void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) { +void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; if (pCmd->command != TSDB_SQL_INSERT) { return; } - SMeterMetaInfo * pInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + STableDataBlocks *pDataBlock = NULL; int32_t affected_rows = 0; - assert(pCmd->isInsertFromFile == 1 && pCmd->pDataBlocks != NULL); + assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL); SDataBlockList *pDataBlockList = pCmd->pDataBlocks; pCmd->pDataBlocks = NULL; @@ -1471,7 +1502,7 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) { if (pDataBlock == NULL) { continue; } - + if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) { tscError("%p failed to malloc when insert file", pSql); continue; @@ -1486,16 +1517,16 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) { continue; } - strncpy(pInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN); + strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN); memset(pDataBlock->pData, 0, pDataBlock->nAllocSize); - int32_t ret = tscGetMeterMeta(pSql, pInfo->name, 0); + int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo); if (ret != TSDB_CODE_SUCCESS) { tscError("%p get meter meta failed, abort", pSql); continue; } - - char* tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \" + + char *tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \" if (NULL == tmpTokenBuf) { tscError("%p calloc failed", pSql); continue; @@ -1503,7 +1534,7 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql) { int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf); free(tmpTokenBuf); - + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); if (nrows < 0) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 57f417d2ad0aed67a979da85c9c7a6411a081315..138dd96a0642f782a41bd66b4e4ad7bf12ffd91f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -65,8 +65,8 @@ static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t na static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem); -static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type, - char* fieldName); +static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, + int8_t type, char* fieldName); static int32_t changeFunctionID(int32_t optr, int16_t* functionId); static int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isMetric); @@ -104,7 +104,7 @@ static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex); static int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuerySql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql); static int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); -static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex); +static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t optrToString(tSQLExpr* pExpr, char** exprString); static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); @@ -127,7 +127,7 @@ static int32_t invalidSqlErrMsg(char* dstBuffer, const char* errMsg) { return tscInvalidSQLErrMsg(dstBuffer, errMsg, NULL); } -static int32_t tscQueryOnlyMetricTags(SQueryInfo *pQueryInfo, bool* queryOnMetricTags) { +static int32_t tscQueryOnlyMetricTags(SQueryInfo* pQueryInfo, bool* queryOnMetricTags) { assert(QUERY_IS_STABLE_QUERY(pQueryInfo->type)); *queryOnMetricTags = true; @@ -198,7 +198,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return TSDB_CODE_APP_ERROR; } - SSqlCmd* pCmd = &(pSql->cmd); + SSqlCmd* pCmd = &(pSql->cmd); SQueryInfo* pQueryInfo = NULL; if (!pInfo->valid) { @@ -206,9 +206,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); - + + assert(pQueryInfo->numOfTables == 0); SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); - + pCmd->command = pInfo->type; switch (pInfo->type) { @@ -237,7 +238,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } else if (pInfo->type == TSDB_SQL_DROP_TABLE) { assert(pInfo->pDCLInfo->nTokens == 1); - if (setMeterID(pSql, 0, pzName, 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterMetaInfo, pzName, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } else if (pInfo->type == TSDB_SQL_DROP_DNODE) { @@ -370,11 +371,11 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - if (setMeterID(pSql, 0, pToken, 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - return tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + return tscGetMeterMeta(pSql, pMeterMetaInfo); } case TSDB_SQL_CFG_DNODE: { @@ -565,7 +566,7 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) { int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { const char* msg1 = "invalid query expression"; const char* msg2 = "interval cannot be less than 10 ms"; - + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) { @@ -624,11 +625,11 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); SColumnList ids = getColumnList(1, 0, PRIMARYKEY_TIMESTAMP_COL_INDEX); - + return insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName); } -int32_t setSlidingClause(SQueryInfo *pQueryInfo, SQuerySQL* pQuerySql) { +int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { const char* msg0 = "sliding value too small"; const char* msg1 = "sliding value no larger than the interval value"; @@ -653,12 +654,11 @@ int32_t setSlidingClause(SQueryInfo *pQueryInfo, SQuerySQL* pQuerySql) { return TSDB_CODE_SUCCESS; } -int32_t setMeterID(SSqlObj* pSql, int32_t subClauseIndex, SSQLToken* pzTableName, int32_t tableIndex) { +int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql) { const char* msg = "name too long"; - SSqlCmd* pCmd = &pSql->cmd; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, subClauseIndex, tableIndex); - int32_t code = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; + int32_t code = TSDB_CODE_SUCCESS; if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path @@ -1032,8 +1032,8 @@ int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable const char* msg1 = "invalid column name/illegal column type in arithmetic expression"; const char* msg2 = "functions can not be mixed up"; const char* msg3 = "not support query expression"; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + for (int32_t i = 0; i < pSelection->nExpr; ++i) { int32_t outputIndex = pQueryInfo->fieldsInfo.numOfOutputCols; tSQLExprItem* pItem = &pSelection->a[i]; @@ -1047,7 +1047,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable // if the name of column is quoted, remove it and set the right information for later process extractColumnNameFromString(pItem); - + pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; // select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2 @@ -1124,7 +1124,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable * transfer sql functions that need secondary merge into another format * in dealing with metric queries such as: count/first/last */ - tscTansformSQLFunctionForMetricQuery(pQueryInfo); + tscTansformSQLFunctionForSTableQuery(pQueryInfo); if (hasUnsupportFunctionsForMetricQuery(pQueryInfo)) { return TSDB_CODE_INVALID_SQL; @@ -1215,10 +1215,10 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, colName); } -void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, - SSchema* pColSchema, int16_t flag) { - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, pColSchema->bytes, - pColSchema->bytes); +void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, + SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) { + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, + pColSchema->bytes, pColSchema->bytes); SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex); if (TSDB_COL_IS_TAG(flag)) { @@ -1294,7 +1294,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN}; strcpy(colSchema.name, TSQL_TBNAME_L); - + pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY; tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true); } else { @@ -1489,7 +1489,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt if (optr == TK_DIFF) { colIdx += 1; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; - tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE); + tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, + TSDB_KEYSIZE); SColumnList ids = getColumnList(1, 0, 0); insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName); @@ -1599,7 +1600,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt for (int32_t i = 0; i < pMeterMetaInfo->pMeterMeta->numOfColumns; ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIdx + i + j, &index) != 0) { + if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIdx + i + j, &index) != + 0) { return TSDB_CODE_INVALID_SQL; } } @@ -1778,7 +1780,7 @@ static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SSQLToken return columnIndex; } -int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex) { +int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { const char* msg0 = "ambiguous column name"; const char* msg1 = "invalid column name"; @@ -1849,7 +1851,7 @@ int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnInd return TSDB_CODE_SUCCESS; } -int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex) { +int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { SSQLToken tableToken = {0}; extractTableNameFromToken(pToken, &tableToken); @@ -1860,7 +1862,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIn return TSDB_CODE_SUCCESS; } -int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo *pQueryInfo, SColumnIndex* pIndex) { +int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { if (pQueryInfo->pMeterInfo == NULL || pQueryInfo->numOfTables == 0) { return TSDB_CODE_INVALID_SQL; } @@ -2057,7 +2059,7 @@ bool validateIpAddress(const char* ip, size_t size) { return ipAddr != INADDR_NONE; } -int32_t tscTansformSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) { +int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); if (pMeterMetaInfo->pMeterMeta == NULL || !UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { @@ -2359,8 +2361,8 @@ static SColumnFilterInfo* addColumnFilterInfo(SColumnBase* pColumn) { return pColFilterInfo; } -static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter, SColumnIndex* columnIndex, - tSQLExpr* pExpr) { +static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter, + SColumnIndex* columnIndex, tSQLExpr* pExpr) { const char* msg = "not supported filter condition"; tSQLExpr* pRight = pExpr->pRight; @@ -3147,7 +3149,8 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S return ret; } -int32_t getQueryCondExpr(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SCondExpr* pCondExpr, int32_t* type, int32_t parentOptr) { +int32_t getQueryCondExpr(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SCondExpr* pCondExpr, int32_t* type, + int32_t parentOptr) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; } @@ -3276,7 +3279,8 @@ int tableNameCompar(const void* lhs, const void* rhs) { return ret > 0 ? 1 : -1; } -static int32_t setTableCondForMetricQuery(SQueryInfo *pQueryInfo, const char* account, tSQLExpr* pExpr, int16_t tableCondIndex, SStringBuilder* sb) { +static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr, + int16_t tableCondIndex, SStringBuilder* sb) { const char* msg = "table name too long"; if (pExpr == NULL) { @@ -3433,7 +3437,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { // for stable join, tag columns - // must be present for join + // must be present for join if (pCondExpr->pJoinExpr == NULL) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -3524,7 +3528,7 @@ int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) { int32_t ret = TSDB_CODE_SUCCESS; - SSqlCmd* pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); pQueryInfo->stime = 0; pQueryInfo->etime = INT64_MAX; @@ -3718,14 +3722,22 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { tVariantListItem* pItem = &pFillToken->a[0]; const int32_t START_INTERPO_COL_IDX = 1; - const char* msg = "illegal value or data overflow"; - const char* msg1 = "value is expected"; - const char* msg2 = "invalid fill option"; + + const char* msg = "illegal value or data overflow"; + const char* msg1 = "value is expected"; + const char* msg2 = "invalid fill option"; if (pItem->pVar.nType != TSDB_DATA_TYPE_BINARY) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } + if (pQueryInfo->defaultVal == NULL) { + pQueryInfo->defaultVal = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(int64_t)); + if (pQueryInfo->defaultVal == NULL) { + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } + } + if (strncasecmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->interpoType = TSDB_INTERPO_NONE; } else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) { @@ -3737,7 +3749,6 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->interpoType = TSDB_INTERPO_PREV; } else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) { - // not support yet pQueryInfo->interpoType = TSDB_INTERPO_LINEAR; } else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) { pQueryInfo->interpoType = TSDB_INTERPO_SET_VALUE; @@ -3766,15 +3777,16 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { for (int32_t i = startPos; i < numOfFillVal; ++i, ++j) { TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i); - + + if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { + setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes); + continue; + } + int32_t ret = tVariantDump(&pFillToken->a[j].pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type); if (ret != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg); } - - if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { - setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes); - } } if ((pFillToken->nExpr < pQueryInfo->fieldsInfo.numOfOutputCols) || @@ -3897,7 +3909,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema if (pExpr->colInfo.colIdx != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - + pQueryInfo->order.order = pQuerySql->pSortOrder->a[0].sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; return TSDB_CODE_SUCCESS; @@ -3948,12 +3960,12 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema if (pExpr->colInfo.colIdx != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - + pQueryInfo->order.order = pQuerySql->pSortOrder->a[0].sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; return TSDB_CODE_SUCCESS; } - + pQueryInfo->order.order = pQuerySql->pSortOrder->a[0].sortOrder; } @@ -3972,19 +3984,19 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, DEFAULT_TABLE_INDEX); if (tscValidateName(&(pAlterSQL->name)) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } - if (setMeterID(pSql, 0, &(pAlterSQL->name), 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo->name, DEFAULT_TABLE_INDEX); + int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -4087,13 +4099,11 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { char name[128] = {0}; strncpy(name, pVarList->a[0].pVar.pz, pVarList->a[0].pVar.nLen); - tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_INT, name, - tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize); + tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_INT, name, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize); memset(name, 0, tListLen(name)); strncpy(name, pVarList->a[1].pVar.pz, pVarList->a[1].pVar.nLen); - tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 1, TSDB_DATA_TYPE_INT, name, - tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize); + tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 1, TSDB_DATA_TYPE_INT, name, tDataTypeDesc[TSDB_DATA_TYPE_INT].nSize); } else if (pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { const char* msg1 = "invalid tag value"; const char* msg2 = "update normal column not supported"; @@ -4370,7 +4380,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuerySql) { SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); const char* msg0 = "soffset/offset can not be less than 0"; @@ -4560,18 +4570,18 @@ void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t t void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) { SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentObj->cmd, subClauseIndex); - + if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex); - int32_t num = pQueryInfo->exprsInfo.numOfExprs; - + int32_t num = pQueryInfo->exprsInfo.numOfExprs; + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, num - 1); - + if (pExpr->functionId != TSDB_FUNC_TAG) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); - int16_t columnInfo = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid); - SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; - SSchema* pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); + int16_t columnInfo = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid); + SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; + SSchema* pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); int16_t type = pSchema[index.columnIndex].type; int16_t bytes = pSchema[index.columnIndex].bytes; @@ -4956,7 +4966,7 @@ int32_t doLocalQueryProcess(SQuerySQL* pQuerySql, SSqlCmd* pCmd, int32_t subClau const char* msg2 = "invalid expression in select clause"; const char* msg3 = "invalid function"; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); tSQLExprList* pExprList = pQuerySql->pSelection; if (pExprList->nExpr != 1) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); @@ -5086,8 +5096,8 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) { // for debug purpose void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + if (pCmd == NULL || pQueryInfo->exprsInfo.numOfExprs == 0) { return; } @@ -5117,8 +5127,9 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p const char* msg1 = "invalid table name"; const char* msg2 = "table name too long"; - SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SSqlCmd* pCmd = &pSql->cmd; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo; @@ -5134,7 +5145,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (setMeterID(pSql, 0, pzTableName, 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -5170,7 +5181,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + + // two table: the first one is for current table, and the secondary is for the super table. + tscAddEmptyMeterMetaInfo(pQueryInfo); + assert(pQueryInfo->numOfTables == 2); + + const int32_t TABLE_INDEX = 0; + const int32_t STABLE_INDEX = 1; + + SMeterMetaInfo* pStableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, STABLE_INDEX); // super table name, create table by using dst SSQLToken* pToken = &(pCreateTable->usingInfo.stableName); @@ -5179,25 +5199,25 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (setMeterID(pSql, 0, pToken, 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } // get meter meta from mnode - strncpy(pCreateTable->usingInfo.tagdata.name, pMeterMetaInfo->name, TSDB_METER_ID_LEN); + strncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, TSDB_METER_ID_LEN); tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals; - int32_t code = tscGetMeterMeta(pSql, pCreateTable->usingInfo.tagdata.name, 0); + int32_t code = tscGetMeterMeta(pSql, pStableMeterMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - if (pMeterMetaInfo->pMeterMeta->numOfTags != pList->nExpr) { + if (pStableMeterMetaInfo->pMeterMeta->numOfTags != pList->nExpr) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } // too long tag values will return invalid sql, not be truncated automatically - SSchema* pTagSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); + SSchema* pTagSchema = tsGetTagSchema(pStableMeterMetaInfo->pMeterMeta); char* tagVal = pCreateTable->usingInfo.tagdata.data; for (int32_t i = 0; i < pList->nExpr; ++i) { @@ -5220,7 +5240,8 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - int32_t ret = setMeterID(pSql, 0, &pInfo->pCreateTableInfo->name, 0); + SMeterMetaInfo* pTableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, TABLE_INDEX); + int32_t ret = setMeterID(pTableMeterMetaInfo, &pInfo->pCreateTableInfo->name, pSql); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -5235,9 +5256,13 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg4 = "fill option not supported in stream computing"; const char* msg5 = "sql too long"; // todo ADD support - SSqlCmd* pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + assert(pQueryInfo->numOfTables == 1); + // if (pQueryInfo->numOfTables == 1) { + // tscAddEmptyMeterMetaInfo(pQueryInfo); + // } + SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -5257,11 +5282,11 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } - if (setMeterID(pSql, 0, &srcToken, 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - int32_t code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + int32_t code = tscGetMeterMeta(pSql, pMeterMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5292,7 +5317,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } // set the created table[stream] name - if (setMeterID(pSql, 0, pzTableName, 0) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -5351,7 +5376,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { SSqlCmd* pCmd = &pSql->cmd; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); // too many result columns not support order by in query if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) { @@ -5390,25 +5416,26 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index); if (pQueryInfo->numOfTables <= i) { // more than one table tscAddEmptyMeterMetaInfo(pQueryInfo); } + SMeterMetaInfo* pMeterInfo1 = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); + SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz}; - if (setMeterID(pSql, index, &t, i) != TSDB_CODE_SUCCESS) { + if (setMeterID(pMeterInfo1, &t, pSql) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - SMeterMetaInfo* pMeterInfo1 = tscGetMeterMetaInfo(pCmd, index, i); - code = tscGetMeterMeta(pSql, pMeterInfo1->name, i); + code = tscGetMeterMeta(pSql, pMeterInfo1); if (code != TSDB_CODE_SUCCESS) { return code; } } + assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr); + // parse the group by clause in the first place - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index); if (parseGroupbyClause(pQueryInfo, pQuerySql->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -5431,8 +5458,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { // set sliding value SSQLToken* pSliding = &pQuerySql->sliding; if (pSliding->n != 0) { - // TODO refactor pCmd->count == 1 means sql in stream function - if (!tscEmbedded && pCmd->count == 0) { + if (!tscEmbedded && pCmd->inStream == 0) { // sliding only allowed in stream const char* msg = "not support sliding in query"; return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } @@ -5476,8 +5502,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { // user does not specified the query time window, twa is not allowed in such case. if ((pQueryInfo->stime == 0 || pQueryInfo->etime == INT64_MAX || - (pQueryInfo->etime == INT64_MAX / 1000 && - pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI)) && + (pQueryInfo->etime == INT64_MAX / 1000 && pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9); } @@ -5492,26 +5517,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - if (pQuerySql->fillType != NULL) { - if (pQueryInfo->nAggTimeInterval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); - } - - if (pQueryInfo->nAggTimeInterval > 0) { - int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime); - // number of result is not greater than 10,000,000 - if ((timeRange == 0) || - (timeRange / pQueryInfo->nAggTimeInterval) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) { - return invalidSqlErrMsg(pQueryInfo->msg, msg6); - } - } - - int32_t ret = parseFillClause(pQueryInfo, pQuerySql); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - } - // in case of join query, time range is required. if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime); @@ -5538,6 +5543,29 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { updateTagColumnIndex(pQueryInfo, i); } + + /* + * fill options are set at the end position, when all columns are set properly + * the columns may be increased due to group by operation + */ + if (pQuerySql->fillType != NULL) { + if (pQueryInfo->nAggTimeInterval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); + } + + if (pQueryInfo->nAggTimeInterval > 0) { + int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime); + // number of result is not greater than 10,000,000 + if ((timeRange == 0) || (timeRange / pQueryInfo->nAggTimeInterval) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) { + return invalidSqlErrMsg(pQueryInfo->msg, msg6); + } + } + + int32_t ret = parseFillClause(pQueryInfo, pQuerySql); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + } return TSDB_CODE_SUCCESS; // Does not build query message here } diff --git a/src/client/src/tscSQLParserImpl.c b/src/client/src/tscSQLParserImpl.c index 49353c6febadafc221d311d472e36d451d191492..261d2dc15317e131f31518d6b27bfc603da1ea7d 100644 --- a/src/client/src/tscSQLParserImpl.c +++ b/src/client/src/tscSQLParserImpl.c @@ -589,6 +589,8 @@ void destroyAllSelectClause(SSubclauseInfo *pClause) { SQuerySQL *pQuerySql = pClause->pClause[i]; doDestroyQuerySql(pQuerySql); } + + tfree(pClause->pClause); } SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pStableName, diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 10bd477001ca3d12cf3e03344c2fc7702bd6c95c..cbec36b137aadd7b19aeffad7f957493d73bb0d3 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -139,7 +139,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { pSql->signature = pSql; pObj->pHb = pSql; tscAddSubqueryInfo(&pObj->pHb->cmd); - + tscTrace("%p pHb is allocated, pObj:%p", pObj->pHb, pObj); } @@ -640,9 +640,9 @@ static int tscLaunchMetricSubQueries(SSqlObj *pSql); // todo merge with callback int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { - SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + pSql->res.qhandle = 0x1; pSql->res.numOfRows = 0; @@ -665,9 +665,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); // refactor as one method - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNewQueryInfo != NULL); - + tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0); tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0); @@ -679,14 +679,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pNew->cmd.numOfCols = 0; pNewQueryInfo->nAggTimeInterval = 0; memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal)); - + // backup the data and clear it in the sqlcmd object pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); // set the ts,tags that involved in join, as the output column of intermediate result tscClearSubqueryInfo(&pNew->cmd); - + SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; @@ -710,7 +710,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu } } } else { - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; } @@ -757,23 +757,23 @@ int doProcessSql(SSqlObj *pSql) { } int tscProcessSql(SSqlObj *pSql) { - char * name = NULL; - SSqlRes * pRes = &pSql->res; - SSqlCmd * pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - SMeterMetaInfo* pMeterMetaInfo = NULL; - int16_t type = 0; - + char * name = NULL; + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SMeterMetaInfo *pMeterMetaInfo = NULL; + int16_t type = 0; + if (pQueryInfo != NULL) { pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); if (pMeterMetaInfo != NULL) { name = pMeterMetaInfo->name; } - + type = pQueryInfo->type; } - + tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); pSql->retry = 0; if (pSql->cmd.command < TSDB_SQL_MGMT) { @@ -783,7 +783,7 @@ int tscProcessSql(SSqlObj *pSql) { pSql->maxRetry = 2; #endif - if (pMeterMetaInfo == NULL) { // the pMeterMetaInfo cannot be NULL + if (pMeterMetaInfo == NULL) { // the pMeterMetaInfo cannot be NULL pSql->res.code = TSDB_CODE_OTHERS; return pSql->res.code; } @@ -793,7 +793,7 @@ int tscProcessSql(SSqlObj *pSql) { } else { // it must be the parent SSqlObj for super table query if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { int32_t idx = pMeterMetaInfo->vnodeIndex; - + SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); pSql->index = pSidList->index; } @@ -808,7 +808,7 @@ int tscProcessSql(SSqlObj *pSql) { if (QUERY_IS_JOIN_QUERY(type)) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - + pState->numOfTotal = pQueryInfo->numOfTables; for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { @@ -908,7 +908,7 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t vnodeIndex, int32_t numOf int tscLaunchMetricSubQueries(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - + // pRes->code check only serves in launching metric sub-queries if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill metric function. @@ -920,10 +920,10 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { tColModel * pModel = NULL; pRes->qhandle = 1; // hack the qhandle check - - const uint32_t nBufferSize = (1 << 16); // 64KB - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + + const uint32_t nBufferSize = (1 << 16); // 64KB + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); int32_t numOfVnodes = pMeterMetaInfo->pMetricMeta->numOfVnodes; assert(numOfVnodes > 0); @@ -939,7 +939,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { pSql->pSubs = malloc(POINTER_BYTES * numOfVnodes); pSql->numOfSubs = numOfVnodes; - + tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfVnodes); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); pState->numOfTotal = numOfVnodes; @@ -991,8 +991,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { // todo handle multi-vnode situation if (pQueryInfo->tsBuf) { - - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); } @@ -1128,8 +1127,8 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; } else { // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); } else { // regular super table query @@ -1159,9 +1158,9 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { return tscHandleSubRetrievalError(trsupport, pSql, numOfRows); } - SSqlRes * pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + SSqlRes * pRes = &pSql->res; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); @@ -1177,7 +1176,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { #ifdef _DEBUG_VIEW printf("received data from vnode: %d rows\n", pRes->numOfRows); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); @@ -1209,8 +1208,8 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { #ifdef _DEBUG_VIEW printf("%ld rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); - + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pDesc->pSchema, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, trsupport->localBuffer->numOfElems, colInfo); @@ -1224,8 +1223,8 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { // each result for a vnode is ordered as an independant list, // then used as an input of loser tree for disk-based merge routine - int32_t ret = - tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType); + int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, + pQueryInfo->groupbyExpr.orderType); if (ret != 0) { /* set no disk space error info, and abort retry */ return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); @@ -1241,9 +1240,9 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, trsupport->pState->numOfTotal, trsupport->pState->numOfCompleted); - SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); tscClearInterpInfo(pPQueryInfo); - + tscCreateLocalReducer(trsupport->pExtMemBuffer, trsupport->pState->numOfTotal, pDesc, trsupport->pFinalColModel, &pPObj->cmd, &pPObj->res); tscTrace("%p build loser tree completed", pPObj); @@ -1319,7 +1318,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; assert(pQueryInfo->numOfTables == 1); @@ -1381,8 +1380,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; } else { - - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNewQueryInfo->pMeterInfo[0]->pMeterMeta != NULL && pNewQueryInfo->pMeterInfo[0]->pMetricMeta != NULL); tscProcessSql(pNew); return; @@ -1420,7 +1418,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { *((uint64_t *)pMsg) = pSql->res.qhandle; pMsg += sizeof(pSql->res.qhandle); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); *((uint16_t *)pMsg) = htons(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type); @@ -1451,16 +1449,19 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char * pMsg, *pStart; int msgLen = 0; - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); - SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + + SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; pStart = pSql->cmd.payload + tsRpcHeadSize; pMsg = pStart; pShellMsg = (SShellSubmitMsg *)pMsg; - pShellMsg->import = pSql->cmd.import; + + pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); - pShellMsg->numOfSid = htonl(pSql->cmd.count); // number of meters to be inserted + pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; @@ -1493,7 +1494,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { */ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) { const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo); @@ -1570,8 +1571,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return -1; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); char * pStart = pCmd->payload + tsRpcHeadSize; @@ -1667,7 +1668,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs); if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) { - tscError("%p illegal value of number of output columns in query msg: %d", pSql, pQueryInfo->fieldsInfo.numOfOutputCols); + tscError("%p illegal value of number of output columns in query msg: %d", pSql, + pQueryInfo->fieldsInfo.numOfOutputCols); return -1; } @@ -2214,13 +2216,13 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SCreateTableMsg); - SCreateTableSQL* pCreateTableInfo = pInfo->pCreateTableInfo; + SCreateTableSQL *pCreateTableInfo = pInfo->pCreateTableInfo; if (pCreateTableInfo->type == TSQL_CREATE_TABLE_FROM_STABLE) { size += sizeof(STagData); } else { size += sizeof(SSchema) * (pCmd->numOfCols + pCmd->count); } - + if (pCreateTableInfo->pSelect != NULL) { size += (pCreateTableInfo->pSelect->selectToken.n + 1); } @@ -2235,9 +2237,9 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSchema * pSchema; int size = 0; - SSqlCmd * pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SSqlCmd *pCmd = &pSql->cmd; + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); // Reallocate the payload size @@ -2308,7 +2310,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); return minMsgSize() + sizeof(SMgmtHead) + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE; } @@ -2319,9 +2321,9 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int msgLen = 0; int size = 0; - SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); size = tscEstimateAlterTableMsgLength(pCmd); @@ -2337,12 +2339,12 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscGetDBInfoFromMeterId(pMeterMetaInfo->name, pMgmt->db); pMsg += sizeof(SMgmtHead); - SAlterTableSQL* pAlterInfo = pInfo->pAlterInfo; - + SAlterTableSQL *pAlterInfo = pInfo->pAlterInfo; + pAlterTableMsg = (SAlterTableMsg *)pMsg; strcpy(pAlterTableMsg->meterId, pMeterMetaInfo->name); pAlterTableMsg->type = htons(pAlterInfo->type); - + pAlterTableMsg->numOfCols = htons(tscNumOfFields(pQueryInfo)); memcpy(pAlterTableMsg->tagVal, pAlterInfo->tagData.data, TSDB_MAX_TAGS_LEN); @@ -2363,7 +2365,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pCmd->msgType = TSDB_MSG_TYPE_ALTER_TABLE; assert(msgLen + minMsgSize() <= size); - + return TSDB_CODE_SUCCESS; } @@ -2406,7 +2408,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SMgmtHead *pMgmt = (SMgmtHead *)pMsg; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); size_t nameLen = strlen(pMeterMetaInfo->name); @@ -2431,7 +2433,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -static int tscSetResultPointer(SQueryInfo* pQueryInfo, SSqlRes *pRes) { +static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) { if (tscCreateResPointerInfo(pQueryInfo, pRes) != TSDB_CODE_SUCCESS) { return pRes->code; } @@ -2461,9 +2463,9 @@ static int tscSetResultPointer(SQueryInfo* pQueryInfo, SSqlRes *pRes) { static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + pRes->code = TSDB_CODE_SUCCESS; if (pRes->rspType == 0) { @@ -2500,9 +2502,9 @@ int tscProcessDescribeTableRsp(SSqlObj *pSql) { } int tscProcessTagRetrieveRsp(SSqlObj *pSql) { - SSqlCmd * pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SSqlCmd *pCmd = &pSql->cmd; + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); int32_t numOfRes = 0; @@ -2519,7 +2521,7 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; pRes->code = tscLocalDoReduce(pSql); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { tscSetResultPointer(pQueryInfo, pRes); @@ -2577,14 +2579,17 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char *tmpData = 0; if (pSql->cmd.allocSize > 0) { tmpData = calloc(1, pSql->cmd.allocSize); - if (NULL == tmpData) return -1; + if (NULL == tmpData) { + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } + // STagData is in binary format, strncpy is not available memcpy(tmpData, pSql->cmd.payload, pSql->cmd.allocSize); } - SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); pMsg = pCmd->payload + tsRpcHeadSize; @@ -2597,10 +2602,10 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pInfoMsg = (SMeterInfoMsg *)pMsg; strcpy(pInfoMsg->meterId, pMeterMetaInfo->name); - pInfoMsg->createFlag = htons((uint16_t)pQueryInfo->defaultVal[0]); + pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0); pMsg += sizeof(SMeterInfoMsg); - if (pQueryInfo->defaultVal[0] != 0) { + if (pSql->cmd.createOnDemand) { memcpy(pInfoMsg->tags, tmpData, sizeof(STagData)); pMsg += sizeof(STagData); } @@ -2658,7 +2663,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) { const int32_t defaultSize = minMsgSize() + sizeof(SMetricMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); int32_t n = 0; for (int32_t i = 0; i < pQueryInfo->tagCond.numOfTagCond; ++i) { @@ -2684,9 +2689,9 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int msgLen = 0; int tableIndex = 0; - SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + STagCond *pTagCond = &pQueryInfo->tagCond; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); @@ -2886,7 +2891,7 @@ int tscProcessRetrieveRspFromMgmt(SSqlObj *pSql) { pRes->data = pRetrieve->data; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); tscSetResultPointer(pQueryInfo, pRes); if (pRes->numOfRows == 0) { @@ -3185,7 +3190,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { sizes[k] = pBuf - (char *)pNewMetricMeta; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); for (int32_t i = 0; i < num; ++i) { char name[TSDB_MAX_TAGS_LEN + 1] = {0}; @@ -3231,11 +3236,11 @@ int tscProcessShowRsp(SSqlObj *pSql) { SSchema * pSchema; char key[20]; - SSqlRes * pRes = &pSql->res; - SSqlCmd * pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); //? - + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); //? + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); pShow = (SShowRspMsg *)pRes->pRsp; @@ -3398,8 +3403,8 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp; pRes->numOfRows = htonl(pRetrieve->numOfRows); @@ -3429,15 +3434,15 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { } int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) { - SSqlRes * pRes = &pSql->res; - SSqlCmd * pCmd = &pSql->cmd; + SSqlRes * pRes = &pSql->res; + SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - + SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp; pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->data = pRetrieve->data; - + tscSetResultPointer(pQueryInfo, pRes); pRes->row = 0; return 0; @@ -3445,26 +3450,25 @@ int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) { void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code); -static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { +static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) { int32_t code = TSDB_CODE_SUCCESS; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); if (NULL == pNew) { tscError("%p malloc failed for new sqlobj to get meter meta", pSql); return TSDB_CODE_CLI_OUT_OF_MEMORY; } + pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_META; - pNew->cmd.payload = NULL; - pNew->cmd.allocSize = 0; tscAddSubqueryInfo(&pNew->cmd); - assert(pNew->cmd.numOfClause == 1); - - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - pNewQueryInfo->defaultVal[0] = pQueryInfo->defaultVal[0]; // flag of create table if not exists + + SQueryInfo *pNewQueryInfo = NULL; + tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo); + + pNew->cmd.createOnDemand = pSql->cmd.createOnDemand; // create table if not exists if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("%p malloc failed for payload to get meter meta", pSql); free(pNew); @@ -3472,10 +3476,11 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo); + SMeterMetaInfo *pNewMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo); + assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); - strcpy(pMeterMetaInfo->name, meterId); - memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE); + strcpy(pNewMeterMetaInfo->name, pMeterMetaInfo->name); + memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE); // tag information if table does not exists. tscTrace("%p new pSqlObj:%p to get meterMeta", pSql, pNew); if (pSql->fp == NULL) { @@ -3483,14 +3488,19 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { tsem_init(&pNew->emptyRspSem, 0, 1); code = tscProcessSql(pNew); - SMeterMetaInfo *pInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, index); - // update cache only on success get metermeta + /* + * Update cache only on succeeding in getting metermeta. + * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine + */ if (code == TSDB_CODE_SUCCESS) { - pInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, meterId); + pMeterMetaInfo->pMeterMeta = pNewMeterMetaInfo->pMeterMeta; + pNewMeterMetaInfo->pMeterMeta = NULL; + + assert(pMeterMetaInfo->pMeterMeta != NULL); } - tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pInfo->pMeterMeta); + tscTrace("%p get meter meta complete, code:%d, pMeterMeta:%p", pSql, code, pMeterMetaInfo->pMeterMeta); tscFreeSqlObj(pNew); } else { @@ -3507,14 +3517,12 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { return code; } -int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { - SSqlCmd *pCmd = &pSql->cmd; - - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, index); +int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) { + assert(strlen(pMeterMetaInfo->name) != 0); // if the SSqlCmd owns a metermeta, release it first taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false); - pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, meterId); + pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name); if (pMeterMetaInfo->pMeterMeta != NULL) { SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -3529,18 +3537,12 @@ int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { * for async insert operation, release data block buffer before issue new object to get metermeta * because in metermeta callback function, the tscParse function will generate the submit data blocks */ - // if (pSql->fp != NULL && pSql->pStream == NULL) { - // tscFreeSqlCmdData(pCmd); - //} - - return tscDoGetMeterMeta(pSql, meterId, index); + return doGetMeterMetaFromServer(pSql, pMeterMetaInfo); } -int tscGetMeterMetaEx(SSqlObj *pSql, char *meterId, bool createIfNotExists) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - - pQueryInfo->defaultVal[0] = createIfNotExists ? 1 : 0; - return tscGetMeterMeta(pSql, meterId, 0); +int tscGetMeterMetaEx(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo, bool createIfNotExists) { + pSql->cmd.createOnDemand = createIfNotExists; + return tscGetMeterMeta(pSql, pMeterMetaInfo); } /* @@ -3563,12 +3565,12 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) { * @return status code */ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { - int code = 0; - + int code = 0; + // handle metric meta renew process SSqlCmd *pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); // enforce the renew metermeta operation in async model @@ -3583,10 +3585,11 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { tscTrace("%p update meter meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, pMeterMetaInfo->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, pMeterMetaInfo->pMeterMeta); } + tscWaitingForCreateTable(pCmd); taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); - code = tscDoGetMeterMeta(pSql, meterId, 0); // todo ?? + code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ?? } else { tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, pMeterMetaInfo->pMeterMeta->numOfTags, pCmd->numOfCols, pMeterMetaInfo->pMeterMeta->uid, @@ -3609,10 +3612,10 @@ int tscGetMetricMeta(SSqlObj *pSql) { /* * the vnode query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache. */ - bool reqMetricMeta = false; + bool reqMetricMeta = false; int32_t subClauseIndex = 0; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0}; @@ -3640,12 +3643,15 @@ int tscGetMetricMeta(SSqlObj *pSql) { pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_METRIC; - + + SQueryInfo *pNewQueryInfo = NULL; + tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo); + for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - + SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name); - tscAddMeterMetaInfo(pQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex); + tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex); } if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { @@ -3653,8 +3659,6 @@ int tscGetMetricMeta(SSqlObj *pSql) { return code; } - // the query condition on meter is serialized into payload - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 2361e8f40e5f7b69ca5948aa35050a5c50447e2f..4b98780696bc1dbebeef90910e44b45b1ef5f60e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -298,8 +298,11 @@ int taos_num_fields(TAOS_RES *res) { if (pSql == NULL || pSql->signature != pSql) return 0; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + if (pQueryInfo == NULL) { + return 0; + } + SFieldInfo *pFieldsInfo = &pQueryInfo->fieldsInfo; - return (pFieldsInfo->numOfOutputCols - pFieldsInfo->numOfHiddenCols); } @@ -993,11 +996,8 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t int code = TSDB_CODE_INVALID_METER_ID; char *str = (char *)tblNameList; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - if (pQueryInfo == NULL) { - tscAddSubqueryInfo(pCmd); - pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - } + SQueryInfo* pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo); SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); @@ -1034,7 +1034,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t return code; } - if ((code = setMeterID(pSql, 0, &sToken, 0)) != TSDB_CODE_SUCCESS) { + if ((code = setMeterID(pMeterMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index e5fdd65accac433de19b294b7008551737713cbf..f9ccd9f6e753f67048b6f108660226cd724e2b73 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -70,7 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - int code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); + int code = tscGetMeterMeta(pSql, pMeterMetaInfo); pSql->res.code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; @@ -82,7 +82,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; } - tscTansformSQLFunctionForMetricQuery(pQueryInfo); + tscTansformSQLFunctionForSTableQuery(pQueryInfo); // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { @@ -391,6 +391,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { } pStream->slidingTime = pQueryInfo->nSlidingTime; + pQueryInfo->nAggTimeInterval = 0; // clear the interval value to avoid the force time window split by query processor } static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { @@ -507,8 +508,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } - // TODO later refactor use enum - pSql->cmd.count = 1; // 1 means sql in stream, allowed the sliding clause. + pSql->cmd.inStream = 1; // 1 means sql in stream, allowed the sliding clause. pRes->code = tscToSQLCmd(pSql, &SQLInfo); SQLInfoDestroy(&SQLInfo); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c8a9e2b1e1ee9952149a253a670af42cb9605d25..1d0f030b2124deb6d69206273177a944c7cdd7e0 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -319,7 +319,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) { } pQueryInfo->interpoType = TSDB_INTERPO_NONE; - memset(pQueryInfo->defaultVal, 0, sizeof(pQueryInfo->defaultVal)); + tfree(pQueryInfo->defaultVal); } void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) { @@ -398,20 +398,20 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { tfree(pSql->sqlstr); pthread_mutex_unlock(&pObj->mutex); - tfree(pSql->res.pRsp); - pSql->res.row = 0; - pSql->res.numOfRows = 0; - pSql->res.numOfTotal = 0; + tfree(pRes->pRsp); + pRes->row = 0; + pRes->numOfRows = 0; + pRes->numOfTotal = 0; - pSql->res.numOfGroups = 0; - tfree(pSql->res.pGroupRec); + pRes->numOfGroups = 0; + tfree(pRes->pGroupRec); tscDestroyLocalReducer(pSql); tfree(pSql->pSubs); pSql->numOfSubs = 0; tscDestroyResPointerInfo(pRes); - tfree(pSql->res.pColumnIndex); + tfree(pRes->pColumnIndex); tscFreeSqlCmdData(pCmd); } @@ -535,7 +535,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { SSqlCmd* pCmd = &pSql->cmd; assert(pDataBlock->pMeterMeta != NULL); - pCmd->count = pDataBlock->numOfMeters; + pCmd->numOfTablesInSubmit = pDataBlock->numOfMeters; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache @@ -1548,7 +1548,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); + assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2); if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { tscTrace("%p object should be release since all data blocks have been submit", pSql); @@ -1664,6 +1664,8 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); + + tfree(pQueryInfo->defaultVal); } void tscClearSubqueryInfo(SSqlCmd* pCmd) { @@ -1814,15 +1816,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo)); - pNewQueryInfo->pMeterInfo = NULL; memset(&pNewQueryInfo->colList, 0, sizeof(pNewQueryInfo->colList)); memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - + + pNewQueryInfo->pMeterInfo = NULL; + pNewQueryInfo->defaultVal = NULL; pNewQueryInfo->numOfTables = 0; pNewQueryInfo->tsBuf = NULL; tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); + pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); + memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); @@ -1910,8 +1915,8 @@ void tscDoQuery(SSqlObj* pSql) { tscAddIntoSqlList(pSql); } - if (pCmd->isInsertFromFile == 1) { - tscProcessMultiVnodesInsertForFile(pSql); + if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { + tscProcessMultiVnodesInsertFromFile(pSql); } else { // pSql may be released in this function if it is a async insertion. tscProcessSql(pSql); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 5171ca596ffedcf61cf1f46984323d7919b050cc..1ec9422bbf492aba3fc0df639b382da5a593ed25 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -208,7 +208,7 @@ extern "C" { #define TSDB_MAX_RPC_THREADS 5 -#define TSDB_QUERY_TYPE_QUERY 0 // normal query +#define TSDB_QUERY_TYPE_NON_TYPE 0x00U // none type #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01U // free qhandle at vnode /* @@ -224,6 +224,13 @@ extern "C" { #define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40U // select *,columns... query #define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80U // join sub query at the second stage +#define TSDB_QUERY_TYPE_INSERT 0x100U // insert type +#define TSDB_QUERY_TYPE_IMPORT 0x200U // import data + +#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) +#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) +#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE) + #define TSQL_SO_ASC 1 #define TSQL_SO_DESC 0 diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 8a15cc896051a2459d4fd874866109025e44fb6d..80dbda2ec53d39330a0634f4f047f60af9666b43 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -585,6 +585,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { SShellSubmitMsg *pSubmit = &shellSubmit; SShellSubmitBlock *pBlocks = NULL; + pSubmit->import = htons(pSubmit->import); pSubmit->vnode = htons(pSubmit->vnode); pSubmit->numOfSid = htonl(pSubmit->numOfSid);