未验证 提交 11dd3efd 编写于 作者: H hzcheng 提交者: GitHub

Merge pull request #752 from taosdata/feature/liaohj

Feature/liaohj
...@@ -127,7 +127,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size); ...@@ -127,7 +127,7 @@ int tscAllocPayload(SSqlCmd* pCmd, int size);
void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema* pSchema); void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema* pSchema);
void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField); void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField);
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, char* name, int16_t bytes); void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes);
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible); void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoCalOffset(SSqlCmd* pCmd); void tscFieldInfoCalOffset(SSqlCmd* pCmd);
...@@ -143,7 +143,9 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo); ...@@ -143,7 +143,9 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
SSqlExpr* tscSqlExprInsert(SSqlCmd* pCmd, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SSqlCmd* pCmd, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, /*int16_t colId,*/ int16_t interSize); int16_t size, int16_t interSize);
SSqlExpr* tscSqlExprInsertEmpty(SSqlCmd* pCmd, int32_t index, int16_t functionId);
SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
......
...@@ -92,7 +92,12 @@ enum _sql_cmd { ...@@ -92,7 +92,12 @@ enum _sql_cmd {
*/ */
TSDB_SQL_RETRIEVE_EMPTY_RESULT, TSDB_SQL_RETRIEVE_EMPTY_RESULT,
TSDB_SQL_RESET_CACHE, TSDB_SQL_RESET_CACHE, // 40
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
TSDB_SQL_CLI_VERSION,
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL, TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX TSDB_SQL_MAX
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
#include "tschemautil.h" #include "tschemautil.h"
#include "tsocket.h" #include "tsocket.h"
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, size_t valueLength);
static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) { static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) {
char buf[512] = {0}; char buf[512] = {0};
...@@ -39,7 +41,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type ...@@ -39,7 +41,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_ #ifdef _TD_ARM_32_
double dv = 0; double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)pData; *(int64_t *)(&dv) = *(int64_t *)pData;
len = sprintf(buf, "%f", dv); len = sprintf(buf, "%f", dv);
#else #else
len = sprintf(buf, "%lf", *(double *)pData); len = sprintf(buf, "%lf", *(double *)pData);
...@@ -47,12 +49,11 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type ...@@ -47,12 +49,11 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
if (strncasecmp("nan", buf, 3) == 0) { if (strncasecmp("nan", buf, 3) == 0) {
len = 4; len = 4;
} }
} } break;
break;
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32_ #ifdef _TD_ARM_32_
float fv = 0; float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)pData; *(int32_t *)(&fv) = *(int32_t *)pData;
len = sprintf(buf, "%f", fv); len = sprintf(buf, "%f", fv);
#else #else
len = sprintf(buf, "%f", *(float *)pData); len = sprintf(buf, "%f", *(float *)pData);
...@@ -60,8 +61,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type ...@@ -60,8 +61,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
if (strncasecmp("nan", buf, 3) == 0) { if (strncasecmp("nan", buf, 3) == 0) {
len = 4; len = 4;
} }
} } break;
break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len = sprintf(buf, "%lld", *(int64_t *)pData); len = sprintf(buf, "%lld", *(int64_t *)pData);
...@@ -203,23 +203,21 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { ...@@ -203,23 +203,21 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32_ #ifdef _TD_ARM_32_
float fv = 0; float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)pTagValue; *(int32_t *)(&fv) = *(int32_t *)pTagValue;
sprintf(target, "%f", fv); sprintf(target, "%f", fv);
#else #else
sprintf(target, "%f", *(float *)pTagValue); sprintf(target, "%f", *(float *)pTagValue);
#endif #endif
} } break;
break;
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32_ #ifdef _TD_ARM_32_
double dv = 0; double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)pTagValue; *(int64_t *)(&dv) = *(int64_t *)pTagValue;
sprintf(target, "%lf", dv); sprintf(target, "%lf", dv);
#else #else
sprintf(target, "%lf", *(double *)pTagValue); sprintf(target, "%lf", *(double *)pTagValue);
#endif #endif
} } break;
break;
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
sprintf(target, "%d", *(int8_t *)pTagValue); sprintf(target, "%d", *(int8_t *)pTagValue);
break; break;
...@@ -391,6 +389,68 @@ static int tscProcessQueryTags(SSqlObj *pSql) { ...@@ -391,6 +389,68 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
} }
} }
static void tscProcessCurrentUser(SSqlObj *pSql) {
SSqlExpr* pExpr = tscSqlExprGet(&pSql->cmd, 0);
tscSetLocalQueryResult(pSql, pSql->pTscObj->user, pExpr->aliasName, TSDB_USER_LEN);
}
static void tscProcessCurrentDB(SSqlObj *pSql) {
char db[TSDB_DB_NAME_LEN + 1] = {0};
extractDBName(pSql->pTscObj->db, db);
// no use db is invoked before.
if (strlen(db) == 0) {
setNull(db, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN);
}
SSqlExpr* pExpr = tscSqlExprGet(&pSql->cmd, 0);
tscSetLocalQueryResult(pSql, db, pExpr->aliasName, TSDB_DB_NAME_LEN);
}
static void tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SSqlExpr* pExpr = tscSqlExprGet(&pSql->cmd, 0);
tscSetLocalQueryResult(pSql, v, pExpr->aliasName, tListLen(pSql->pTscObj->sversion));
}
static void tscProcessClientVer(SSqlObj *pSql) {
SSqlExpr* pExpr = tscSqlExprGet(&pSql->cmd, 0);
tscSetLocalQueryResult(pSql, version, pExpr->aliasName, strlen(version));
}
static void tscProcessServStatus(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
if (pObj->pHb != NULL) {
if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return;
}
} else {
if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
return;
}
}
SSqlExpr* pExpr = tscSqlExprGet(&pSql->cmd, 0);
tscSetLocalQueryResult(pSql, "1", pExpr->aliasName, 2);
}
void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, size_t valueLength) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pCmd->numOfCols = 1;
pCmd->order.order = TSQL_SO_ASC;
tscFieldInfoSetValue(&pCmd->fieldsInfo, 0, TSDB_DATA_TYPE_BINARY, columnName, valueLength);
tscInitResObjForLocalQuery(pSql, 1, valueLength);
TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, 0);
strncpy(pRes->data, val, pField->bytes);
}
int tscProcessLocalCmd(SSqlObj *pSql) { int tscProcessLocalCmd(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -402,13 +462,23 @@ int tscProcessLocalCmd(SSqlObj *pSql) { ...@@ -402,13 +462,23 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.code = (uint8_t)tscProcessQueryTags(pSql); pSql->res.code = (uint8_t)tscProcessQueryTags(pSql);
} else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { } else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
/* /*
* pass the qhandle check, in order to call partial release function to * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
* free allocated resources and remove the SqlObj from linked list * free allocated resources and remove the SqlObj from sql query linked list
*/ */
pSql->res.qhandle = 0x1; // pass the qhandle check pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0; pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) { } else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosClearDataCache(tscCacheHandle); taosClearDataCache(tscCacheHandle);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
tscProcessClientVer(pSql);
} else if (pCmd->command == TSDB_SQL_CURRENT_USER) {
tscProcessCurrentUser(pSql);
} else if (pCmd->command == TSDB_SQL_CURRENT_DB) {
tscProcessCurrentDB(pSql);
} else if (pCmd->command == TSDB_SQL_SERV_STATUS) {
tscProcessServStatus(pSql);
} else { } else {
pSql->res.code = TSDB_CODE_INVALID_SQL; pSql->res.code = TSDB_CODE_INVALID_SQL;
tscError("%p not support command:%d", pSql, pCmd->command); tscError("%p not support command:%d", pSql, pCmd->command);
......
...@@ -117,7 +117,8 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString); ...@@ -117,7 +117,8 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex);
static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex); static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql); static int32_t doFunctionsCompatibleCheck(SSqlObj* pSql);
static int32_t doLocalQueryProcess(SQuerySQL* pQuerySql, SSqlCmd* pCmd);
static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) { static int32_t tscQueryOnlyMetricTags(SSqlCmd* pCmd, bool* queryOnMetricTags) {
assert(QUERY_IS_STABLE_QUERY(pCmd->type)); assert(QUERY_IS_STABLE_QUERY(pCmd->type));
...@@ -877,7 +878,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -877,7 +878,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSQL_QUERY_METER: { case TSQL_QUERY_METER: {
SQuerySQL* pQuerySql = pInfo->pQueryInfo; SQuerySQL* pQuerySql = pInfo->pQueryInfo;
assert(pQuerySql != NULL && pQuerySql->from->nExpr > 0); assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0));
const char* msg0 = "invalid table name"; const char* msg0 = "invalid table name";
const char* msg1 = "table name too long"; const char* msg1 = "table name too long";
...@@ -895,6 +896,19 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -895,6 +896,19 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
setErrMsg(pCmd, msg8); setErrMsg(pCmd, msg8);
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
/*
* handle the sql expression without from subclause
* select current_database();
* select server_version();
* select client_version();
* select server_state();
*/
if (pQuerySql->from == NULL) {
assert(pQuerySql->fillType == NULL && pQuerySql->pGroupby == NULL && pQuerySql->pWhere == NULL &&
pQuerySql->pSortOrder == NULL);
return doLocalQueryProcess(pQuerySql, pCmd);
}
if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM) { if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM) {
setErrMsg(pCmd, msg7); setErrMsg(pCmd, msg7);
...@@ -1120,7 +1134,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) { ...@@ -1120,7 +1134,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQuerySQL* pQuerySql) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (pQuerySql->interval.type == 0) { if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4971,8 +4985,8 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) { ...@@ -4971,8 +4985,8 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd) {
} }
typedef struct SDNodeDynConfOption { typedef struct SDNodeDynConfOption {
char* name; char* name; // command name
int32_t len; int32_t len; // name string length
} SDNodeDynConfOption; } SDNodeDynConfOption;
int32_t validateDNodeConfig(tDCLSQL* pOptions) { int32_t validateDNodeConfig(tDCLSQL* pOptions) {
...@@ -4980,7 +4994,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { ...@@ -4980,7 +4994,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
SDNodeDynConfOption DNODE_DYNAMIC_CFG_OPTIONS[14] = { const SDNodeDynConfOption DNODE_DYNAMIC_CFG_OPTIONS[14] = {
{"resetLog", 8}, {"resetQueryCache", 15}, {"dDebugFlag", 10}, {"rpcDebugFlag", 12}, {"resetLog", 8}, {"resetQueryCache", 15}, {"dDebugFlag", 10}, {"rpcDebugFlag", 12},
{"tmrDebugFlag", 12}, {"cDebugFlag", 10}, {"uDebugFlag", 10}, {"mDebugFlag", 10}, {"tmrDebugFlag", 12}, {"cDebugFlag", 10}, {"uDebugFlag", 10}, {"mDebugFlag", 10},
{"sdbDebugFlag", 12}, {"httpDebugFlag", 13}, {"monitorDebugFlag", 16}, {"qDebugflag", 10}, {"sdbDebugFlag", 12}, {"httpDebugFlag", 13}, {"monitorDebugFlag", 16}, {"qDebugflag", 10},
...@@ -4991,7 +5005,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { ...@@ -4991,7 +5005,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
if (pOptions->nTokens == 2) { if (pOptions->nTokens == 2) {
// reset log and reset query cache does not need value // reset log and reset query cache does not need value
for (int32_t i = 0; i < 2; ++i) { for (int32_t i = 0; i < 2; ++i) {
SDNodeDynConfOption* pOption = &DNODE_DYNAMIC_CFG_OPTIONS[i]; const SDNodeDynConfOption* pOption = &DNODE_DYNAMIC_CFG_OPTIONS[i];
if ((strncasecmp(pOption->name, pOptionToken->z, pOptionToken->n) == 0) && (pOption->len == pOptionToken->n)) { if ((strncasecmp(pOption->name, pOptionToken->z, pOptionToken->n) == 0) && (pOption->len == pOptionToken->n)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5014,7 +5028,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { ...@@ -5014,7 +5028,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
} }
for (int32_t i = 2; i < tListLen(DNODE_DYNAMIC_CFG_OPTIONS) - 1; ++i) { for (int32_t i = 2; i < tListLen(DNODE_DYNAMIC_CFG_OPTIONS) - 1; ++i) {
SDNodeDynConfOption* pOption = &DNODE_DYNAMIC_CFG_OPTIONS[i]; const SDNodeDynConfOption* pOption = &DNODE_DYNAMIC_CFG_OPTIONS[i];
if ((strncasecmp(pOption->name, pOptionToken->z, pOptionToken->n) == 0) && (pOption->len == pOptionToken->n)) { if ((strncasecmp(pOption->name, pOptionToken->z, pOptionToken->n) == 0) && (pOption->len == pOptionToken->n)) {
/* options is valid */ /* options is valid */
...@@ -5031,8 +5045,10 @@ int32_t validateLocalConfig(tDCLSQL* pOptions) { ...@@ -5031,8 +5045,10 @@ int32_t validateLocalConfig(tDCLSQL* pOptions) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
SDNodeDynConfOption LOCAL_DYNAMIC_CFG_OPTIONS[6] = {{"resetLog", 8}, {"rpcDebugFlag", 12}, {"tmrDebugFlag", 12}, SDNodeDynConfOption LOCAL_DYNAMIC_CFG_OPTIONS[6] = {
{"cDebugFlag", 10}, {"uDebugFlag", 10}, {"debugFlag", 9}}; {"resetLog", 8}, {"rpcDebugFlag", 12}, {"tmrDebugFlag", 12},
{"cDebugFlag", 10}, {"uDebugFlag", 10}, {"debugFlag", 9}
};
SSQLToken* pOptionToken = &pOptions->a[0]; SSQLToken* pOptionToken = &pOptions->a[0];
...@@ -5686,3 +5702,53 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { ...@@ -5686,3 +5702,53 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
return checkUpdateTagPrjFunctions(pCmd); return checkUpdateTagPrjFunctions(pCmd);
} }
} }
int32_t doLocalQueryProcess(SQuerySQL* pQuerySql, SSqlCmd* pCmd) {
const char* msg1 = "only one expression allowed";
const char* msg2 = "invalid expression in select clause";
const char* msg3 = "invalid function";
tSQLExprList* pExprList = pQuerySql->pSelection;
if (pExprList->nExpr != 1) {
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}
tSQLExpr* pExpr = pExprList->a[0].pNode;
if (pExpr->operand.z == NULL) {
setErrMsg(pCmd, msg2);
return TSDB_CODE_INVALID_SQL;
}
SDNodeDynConfOption functionsInfo[5] = {
{"database()", 10}, {"server_version()", 16}, {"server_status()", 15}, {"client_version()", 16}, {"current_user()", 14}
};
int32_t index = -1;
for(int32_t i = 0; i < tListLen(functionsInfo); ++i) {
if (strncasecmp(functionsInfo[i].name, pExpr->operand.z, functionsInfo[i].len) == 0 &&
functionsInfo[i].len == pExpr->operand.n) {
index = i;
break;
}
}
SSqlExpr* pExpr1 = tscSqlExprInsertEmpty(pCmd, 0, TSDB_FUNC_TAG_DUMMY);
if (pExprList->a[0].aliasName != NULL) {
strncpy(pExpr1->aliasName, pExprList->a[0].aliasName, tListLen(pExpr1->aliasName));
} else {
strncpy(pExpr1->aliasName, functionsInfo[index].name, tListLen(pExpr1->aliasName));
}
switch(index) {
case 0: pCmd->command = TSDB_SQL_CURRENT_DB;return TSDB_CODE_SUCCESS;
case 1: pCmd->command = TSDB_SQL_SERV_VERSION;return TSDB_CODE_SUCCESS;
case 2: pCmd->command = TSDB_SQL_SERV_STATUS;return TSDB_CODE_SUCCESS;
case 3: pCmd->command = TSDB_SQL_CLI_VERSION;return TSDB_CODE_SUCCESS;
case 4: pCmd->command = TSDB_SQL_CURRENT_USER;return TSDB_CODE_SUCCESS;
default: {
setErrMsg(pCmd, msg3);
return TSDB_CODE_INVALID_SQL;
}
}
}
...@@ -500,7 +500,7 @@ void tSQLSetColumnType(TAOS_FIELD *pField, SSQLToken *type) { ...@@ -500,7 +500,7 @@ void tSQLSetColumnType(TAOS_FIELD *pField, SSQLToken *type) {
SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection, tVariantList *pFrom, tSQLExpr *pWhere, SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection, tVariantList *pFrom, tSQLExpr *pWhere,
tVariantList *pGroupby, tVariantList *pSortOrder, SSQLToken *pInterval, tVariantList *pGroupby, tVariantList *pSortOrder, SSQLToken *pInterval,
SSQLToken *pSliding, tVariantList *pFill, SLimitVal *pLimit, SLimitVal *pGLimit) { SSQLToken *pSliding, tVariantList *pFill, SLimitVal *pLimit, SLimitVal *pGLimit) {
assert(pSelection != NULL && pFrom != NULL && pInterval != NULL && pLimit != NULL && pGLimit != NULL); assert(pSelection != NULL);
SQuerySQL *pQuery = calloc(1, sizeof(SQuerySQL)); SQuerySQL *pQuery = calloc(1, sizeof(SQuerySQL));
pQuery->selectToken = *pSelectToken; pQuery->selectToken = *pSelectToken;
...@@ -512,13 +512,23 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection, ...@@ -512,13 +512,23 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection,
pQuery->pSortOrder = pSortOrder; pQuery->pSortOrder = pSortOrder;
pQuery->pWhere = pWhere; pQuery->pWhere = pWhere;
pQuery->limit = *pLimit; if (pLimit != NULL) {
pQuery->slimit = *pGLimit; pQuery->limit = *pLimit;
}
if (pGLimit != NULL) {
pQuery->slimit = *pGLimit;
}
pQuery->interval = *pInterval; if (pInterval != NULL) {
pQuery->sliding = *pSliding; pQuery->interval = *pInterval;
}
if (pSliding != NULL) {
pQuery->sliding = *pSliding;
}
pQuery->fillType = pFill; pQuery->fillType = pFill;
return pQuery; return pQuery;
} }
......
...@@ -184,6 +184,15 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { ...@@ -184,6 +184,15 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
pSql->vnode = TSC_MGMT_VNODE; pSql->vnode = TSC_MGMT_VNODE;
#endif #endif
} }
// the pSql->res.code is the previous error(status) code.
if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
if (pSql->res.code != TSDB_CODE_SUCCESS && pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS) {
*pCode = pSql->res.code;
}
tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
}
} }
void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
...@@ -442,6 +451,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -442,6 +451,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
tscTrace("%p it shall be redirected!", pSql); tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL; pSql->thandle = NULL;
// reset the retry times for a new mgmt node
pSql->retry = 0;
if (pCmd->command > TSDB_SQL_MGMT) { if (pCmd->command > TSDB_SQL_MGMT) {
tscProcessMgmtRedirect(pSql, pMsg->content + 1); tscProcessMgmtRedirect(pSql, pMsg->content + 1);
...@@ -3795,7 +3807,14 @@ void tscInitMsgs() { ...@@ -3795,7 +3807,14 @@ void tscInitMsgs() {
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode; // rsp handled by same function. tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp; tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp; tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp;
......
...@@ -246,9 +246,9 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { ...@@ -246,9 +246,9 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL; pSql->thandle = NULL;
if (pRes->code != TSDB_CODE_SUCCESS) return pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) {
tscDoQuery(pSql);
tscDoQuery(pSql); }
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
......
...@@ -720,7 +720,7 @@ static void evic(SFieldInfo* pFieldInfo, int32_t index) { ...@@ -720,7 +720,7 @@ static void evic(SFieldInfo* pFieldInfo, int32_t index) {
} }
} }
static void setValueImpl(TAOS_FIELD* pField, int8_t type, char* name, int16_t bytes) { static void setValueImpl(TAOS_FIELD* pField, int8_t type, const char* name, int16_t bytes) {
pField->type = type; pField->type = type;
strncpy(pField->name, name, TSDB_COL_NAME_LEN); strncpy(pField->name, name, TSDB_COL_NAME_LEN);
pField->bytes = bytes; pField->bytes = bytes;
...@@ -764,7 +764,7 @@ void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visib ...@@ -764,7 +764,7 @@ void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visib
} }
} }
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, char* name, int16_t bytes) { void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes) {
ensureSpace(pFieldInfo, pFieldInfo->numOfOutputCols + 1); ensureSpace(pFieldInfo, pFieldInfo->numOfOutputCols + 1);
evic(pFieldInfo, index); evic(pFieldInfo, index);
...@@ -896,6 +896,19 @@ static void _exprEvic(SSqlExprInfo* pExprInfo, int32_t index) { ...@@ -896,6 +896,19 @@ static void _exprEvic(SSqlExprInfo* pExprInfo, int32_t index) {
} }
} }
SSqlExpr* tscSqlExprInsertEmpty(SSqlCmd* pCmd, int32_t index, int16_t functionId) {
SSqlExprInfo* pExprInfo = &pCmd->exprsInfo;
_exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1);
_exprEvic(pExprInfo, index);
SSqlExpr* pExpr = &pExprInfo->pExprs[index];
pExpr->functionId = functionId;
pExprInfo->numOfExprs++;
return pExpr;
}
SSqlExpr* tscSqlExprInsert(SSqlCmd* pCmd, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SSqlCmd* pCmd, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize) { int16_t size, int16_t interSize) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pColIndex->tableIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pColIndex->tableIndex);
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <stdbool.h> #include <stdbool.h>
#include "tsql.h" #include "tsql.h"
#include "tutil.h" #include "tutil.h"
} }
...@@ -113,7 +112,7 @@ cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). { ...@@ -113,7 +112,7 @@ cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). {
} }
cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDCLSQLElems(pInfo, DROP_DATABASE, 2, &X, &Y); } cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDCLSQLElems(pInfo, DROP_DATABASE, 2, &X, &Y); }
cmd ::= DROP DNODE IP(X). { setDCLSQLElems(pInfo, DROP_DNODE, 1, &X); } cmd ::= DROP DNODE IPTOKEN(X). { setDCLSQLElems(pInfo, DROP_DNODE, 1, &X); }
cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, DROP_USER, 1, &X); } cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, DROP_USER, 1, &X); }
cmd ::= DROP ACCOUNT ids(X). { setDCLSQLElems(pInfo, DROP_ACCOUNT, 1, &X); } cmd ::= DROP ACCOUNT ids(X). { setDCLSQLElems(pInfo, DROP_ACCOUNT, 1, &X); }
...@@ -129,8 +128,8 @@ cmd ::= DESCRIBE ids(X) cpxName(Y). { ...@@ -129,8 +128,8 @@ cmd ::= DESCRIBE ids(X) cpxName(Y). {
/////////////////////////////////THE ALTER STATEMENT//////////////////////////////////////// /////////////////////////////////THE ALTER STATEMENT////////////////////////////////////////
cmd ::= ALTER USER ids(X) PASS ids(Y). { setDCLSQLElems(pInfo, ALTER_USER_PASSWD, 2, &X, &Y); } cmd ::= ALTER USER ids(X) PASS ids(Y). { setDCLSQLElems(pInfo, ALTER_USER_PASSWD, 2, &X, &Y); }
cmd ::= ALTER USER ids(X) PRIVILEGE ids(Y). { setDCLSQLElems(pInfo, ALTER_USER_PRIVILEGES, 2, &X, &Y);} cmd ::= ALTER USER ids(X) PRIVILEGE ids(Y). { setDCLSQLElems(pInfo, ALTER_USER_PRIVILEGES, 2, &X, &Y);}
cmd ::= ALTER DNODE IP(X) ids(Y). { setDCLSQLElems(pInfo, ALTER_DNODE, 2, &X, &Y); } cmd ::= ALTER DNODE IPTOKEN(X) ids(Y). { setDCLSQLElems(pInfo, ALTER_DNODE, 2, &X, &Y); }
cmd ::= ALTER DNODE IP(X) ids(Y) ids(Z). { setDCLSQLElems(pInfo, ALTER_DNODE, 3, &X, &Y, &Z); } cmd ::= ALTER DNODE IPTOKEN(X) ids(Y) ids(Z). { setDCLSQLElems(pInfo, ALTER_DNODE, 3, &X, &Y, &Z); }
cmd ::= ALTER LOCAL ids(X). { setDCLSQLElems(pInfo, ALTER_LOCAL, 1, &X); } cmd ::= ALTER LOCAL ids(X). { setDCLSQLElems(pInfo, ALTER_LOCAL, 1, &X); }
cmd ::= ALTER LOCAL ids(X) ids(Y). { setDCLSQLElems(pInfo, ALTER_LOCAL, 2, &X, &Y); } cmd ::= ALTER LOCAL ids(X) ids(Y). { setDCLSQLElems(pInfo, ALTER_LOCAL, 2, &X, &Y); }
cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SSQLToken t = {0}; setCreateDBSQL(pInfo, ALTER_DATABASE, &X, &Y, &t);} cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SSQLToken t = {0}; setCreateDBSQL(pInfo, ALTER_DATABASE, &X, &Y, &t);}
...@@ -155,7 +154,7 @@ ifnotexists(X) ::= . {X.n = 0;} ...@@ -155,7 +154,7 @@ ifnotexists(X) ::= . {X.n = 0;}
/////////////////////////////////THE CREATE STATEMENT/////////////////////////////////////// /////////////////////////////////THE CREATE STATEMENT///////////////////////////////////////
//create option for dnode/db/user/account //create option for dnode/db/user/account
cmd ::= CREATE DNODE IP(X). { setDCLSQLElems(pInfo, CREATE_DNODE, 1, &X);} cmd ::= CREATE DNODE IPTOKEN(X). { setDCLSQLElems(pInfo, CREATE_DNODE, 1, &X);}
cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
{ setCreateAcctSQL(pInfo, CREATE_ACCOUNT, &X, &Y, &Z);} { setCreateAcctSQL(pInfo, CREATE_ACCOUNT, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, CREATE_DATABASE, &X, &Y, &Z);} cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, CREATE_DATABASE, &X, &Y, &Z);}
...@@ -350,6 +349,14 @@ select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_ ...@@ -350,6 +349,14 @@ select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_
A = tSetQuerySQLElems(&T, W, X, Y, P, Z, &K, &S, F, &L, &G); A = tSetQuerySQLElems(&T, W, X, Y, P, Z, &K, &S, F, &L, &G);
} }
// Support for the SQL exprssion without from & where subclauses, e.g.,
// select current_database(),
// select server_version(), select client_version(),
// select server_state();
select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySQLElems(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
}
// selcollist is a list of expressions that are to become the return // selcollist is a list of expressions that are to become the return
// values of the SELECT statement. The "*" in statements like // values of the SELECT statement. The "*" in statements like
// "SELECT * FROM ..." is encoded as a special expression with an opcode of TK_ALL. // "SELECT * FROM ..." is encoded as a special expression with an opcode of TK_ALL.
...@@ -392,7 +399,7 @@ tmvar(A) ::= VARIABLE(X). {A = X;} ...@@ -392,7 +399,7 @@ tmvar(A) ::= VARIABLE(X). {A = X;}
%type interval_opt {SSQLToken} %type interval_opt {SSQLToken}
interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N = E; } interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N = E; }
interval_opt(N) ::= . {N.n = 0; } interval_opt(N) ::= . {N.n = 0; N.z = NULL; N.type = 0; }
%type fill_opt {tVariantList*} %type fill_opt {tVariantList*}
%destructor fill_opt {tVariantListDestroy($$);} %destructor fill_opt {tVariantListDestroy($$);}
...@@ -413,7 +420,7 @@ fill_opt(N) ::= FILL LP ID(Y) RP. { ...@@ -413,7 +420,7 @@ fill_opt(N) ::= FILL LP ID(Y) RP. {
%type sliding_opt {SSQLToken} %type sliding_opt {SSQLToken}
sliding_opt(K) ::= SLIDING LP tmvar(E) RP. {K = E; } sliding_opt(K) ::= SLIDING LP tmvar(E) RP. {K = E; }
sliding_opt(K) ::= . {K.n = 0; } sliding_opt(K) ::= . {K.n = 0; K.z = NULL; K.type = 0; }
%type orderby_opt {tVariantList*} %type orderby_opt {tVariantList*}
%destructor orderby_opt {tVariantListDestroy($$);} %destructor orderby_opt {tVariantListDestroy($$);}
...@@ -642,12 +649,12 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). { ...@@ -642,12 +649,12 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
} }
////////////////////////////////////////kill statement/////////////////////////////////////// ////////////////////////////////////////kill statement///////////////////////////////////////
cmd ::= KILL CONNECTION IP(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setDCLSQLElems(pInfo, KILL_CONNECTION, 1, &X);} cmd ::= KILL CONNECTION IPTOKEN(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setDCLSQLElems(pInfo, KILL_CONNECTION, 1, &X);}
cmd ::= KILL STREAM IP(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setDCLSQLElems(pInfo, KILL_STREAM, 1, &X);} cmd ::= KILL STREAM IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setDCLSQLElems(pInfo, KILL_STREAM, 1, &X);}
cmd ::= KILL QUERY IP(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setDCLSQLElems(pInfo, KILL_QUERY, 1, &X);} cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setDCLSQLElems(pInfo, KILL_QUERY, 1, &X);}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED %fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
SPREAD TWA INTERP LAST_ROW NOW IP SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE. SPREAD TWA INTERP LAST_ROW NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT METRIC TBNAME JOIN METRICS STABLE NULL.
\ No newline at end of file \ No newline at end of file
...@@ -31,6 +31,7 @@ extern "C" { ...@@ -31,6 +31,7 @@ extern "C" {
#define TK_OCT 204 // oct number #define TK_OCT 204 // oct number
#define TK_BIN 205 // bin format data 0b111 #define TK_BIN 205 // bin format data 0b111
#define TK_FILE 206 #define TK_FILE 206
#define TK_QUESTION 207 // denoting the placeholder of "?",when invoking statement bind query
#define TSQL_SO_ASC 1 #define TSQL_SO_ASC 1
#define TSQL_SO_DESC 0 #define TSQL_SO_DESC 0
......
...@@ -80,7 +80,7 @@ ...@@ -80,7 +80,7 @@
#define TK_TABLE 62 #define TK_TABLE 62
#define TK_DATABASE 63 #define TK_DATABASE 63
#define TK_DNODE 64 #define TK_DNODE 64
#define TK_IP 65 #define TK_IPTOKEN 65
#define TK_USER 66 #define TK_USER 66
#define TK_ACCOUNT 67 #define TK_ACCOUNT 67
#define TK_USE 68 #define TK_USE 68
...@@ -210,7 +210,6 @@ ...@@ -210,7 +210,6 @@
#define TK_JOIN 192 #define TK_JOIN 192
#define TK_METRICS 193 #define TK_METRICS 193
#define TK_STABLE 194 #define TK_STABLE 194
#define TK_QUESTION 195
#endif #endif
......
...@@ -81,6 +81,8 @@ int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state); ...@@ -81,6 +81,8 @@ int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state); bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj); void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st);
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid); bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid);
void vnodeFreeColumnInfo(SColumnInfo* pColumnInfo); void vnodeFreeColumnInfo(SColumnInfo* pColumnInfo);
bool isGroupbyNormalCol(SSqlGroupbyExpr* pExpr); bool isGroupbyNormalCol(SSqlGroupbyExpr* pExpr);
......
...@@ -286,12 +286,9 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -286,12 +286,9 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
SShellObj *pShell = pImport->pShell; SShellObj *pShell = pImport->pShell;
pImport->retry++; pImport->retry++;
//slow query will block the import operation int32_t code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING);
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (code == TSDB_CODE_NOT_ACTIVE_TABLE) {
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
return; return;
} }
...@@ -303,14 +300,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -303,14 +300,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY //if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0; int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex); pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) { if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) { if (pImport->retry < 1000) {
dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId, "commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, state); commitInProcess, num, pObj->state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return; return;
...@@ -320,15 +317,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -320,15 +317,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
} else { } else {
pPool->commitInProcess = 1; pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, pImport); int32_t ret = vnodeImportData(pObj, pImport);
if (pShell) { if (pShell) {
pShell->code = code; pShell->code = ret;
pShell->numOfTotalPoints += pImport->importedRows; pShell->numOfTotalPoints += pImport->importedRows;
} }
} }
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++; pVnode->version++;
// send response back to shell // send response back to shell
...@@ -702,8 +698,8 @@ int vnodeFindKeyInFile(SImportInfo *pImport, int order) { ...@@ -702,8 +698,8 @@ int vnodeFindKeyInFile(SImportInfo *pImport, int order) {
if (pImport->key != key && pImport->pos > 0) { if (pImport->key != key && pImport->pos > 0) {
if ( pObj->sversion != pBlock->sversion ) { if ( pObj->sversion != pBlock->sversion ) {
dError("vid:%d sid:%d id:%s, import sversion not matached, expected:%d received:%d", pObj->vnode, pObj->sid, dError("vid:%d sid:%d id:%s, import sversion not matched, expected:%d received:%d", pObj->vnode, pObj->sid,
pBlock->sversion, pObj->sversion); pObj->meterId, pBlock->sversion, pObj->sversion);
code = TSDB_CODE_OTHERS; code = TSDB_CODE_OTHERS;
} else { } else {
pImport->offset = pBlock->offset; pImport->offset = pBlock->offset;
...@@ -912,16 +908,12 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -912,16 +908,12 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
if (pShell) { if (pShell) {
pShell->code = code; pShell->code = code;
pShell->numOfTotalPoints += pointsImported; pShell->numOfTotalPoints += pointsImported;
} }
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else { } else {
SImportInfo *pNew, import; SImportInfo *pNew, import;
...@@ -933,7 +925,11 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -933,7 +925,11 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
import.pShell = pShell; import.pShell = pShell;
import.payload = payload; import.payload = payload;
import.rows = rows; import.rows = rows;
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
int32_t num = 0; int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex); pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries; num = pObj->numOfQueries;
...@@ -944,7 +940,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -944,7 +940,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pthread_mutex_lock(&pPool->vmutex); pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo)); memcpy(pNew, &import, sizeof(SImportInfo));
pNew->signature = pNew; pNew->signature = pNew;
...@@ -956,19 +953,25 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -956,19 +953,25 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries); pObj->meterId, commitInProcess, pObj->numOfQueries);
/*
* vnodeProcessImportTimer will set the import status for this table, so need to
* set the import flag here
*/
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0; return 0;
} else { } else {
pPool->commitInProcess = 1; pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex); pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, &import);
int ret = vnodeImportData(pObj, &import);
if (pShell) { if (pShell) {
pShell->code = code; pShell->code = ret;
pShell->numOfTotalPoints += import.importedRows; pShell->numOfTotalPoints += import.importedRows;
} }
} }
} }
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++; pVnode->version++;
if (pShell) { if (pShell) {
......
...@@ -572,7 +572,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -572,7 +572,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dTrace("vid:%d sid:%d id:%s, cache is full, freePoints:%d, notFreeSlots:%d", pObj->vnode, pObj->sid, pObj->meterId, dTrace("vid:%d sid:%d id:%s, cache is full, freePoints:%d, notFreeSlots:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints, pPool->notFreeSlots); pObj->freePoints, pPool->notFreeSlots);
vnodeProcessCommitTimer(pVnode, NULL); vnodeProcessCommitTimer(pVnode, NULL);
return TSDB_CODE_ACTION_IN_PROGRESS; return code;
} }
// FIXME: Here should be after the comparison of sversions. // FIXME: Here should be after the comparison of sversions.
...@@ -608,7 +608,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -608,7 +608,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, numOfPoints,firstKey, lastKey, minAllowedKey, maxAllowedKey); pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, numOfPoints,firstKey, lastKey, minAllowedKey, maxAllowedKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
} }
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERT)) != TSDB_CODE_SUCCESS) {
goto _over;
}
for (i = 0; i < numOfPoints; ++i) { for (i = 0; i < numOfPoints; ++i) {
// meter will be dropped, abort current insertion // meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) { if (pObj->state >= TSDB_METER_STATE_DELETING) {
...@@ -654,6 +658,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -654,6 +658,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pthread_mutex_unlock(&(pVnode->vmutex)); pthread_mutex_unlock(&(pVnode->vmutex));
_over: _over:
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
dTrace("vid:%d sid:%d id:%s, %d out of %d points are inserted, lastKey:%ld source:%d, vnode total storage: %ld", dTrace("vid:%d sid:%d id:%s, %d out of %d points are inserted, lastKey:%ld source:%d, vnode total storage: %ld",
pObj->vnode, pObj->sid, pObj->meterId, points, numOfPoints, pObj->lastKey, source, pObj->vnode, pObj->sid, pObj->meterId, points, numOfPoints, pObj->lastKey, source,
pVnode->vnodeStatistic.totalStorage); pVnode->vnodeStatistic.totalStorage);
......
...@@ -565,40 +565,15 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -565,40 +565,15 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int sversion = htonl(pBlocks->sversion); int sversion = htonl(pBlocks->sversion);
int32_t state = TSDB_METER_STATE_READY;
if (pSubmit->import) { if (pSubmit->import) {
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
} else { } else {
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT); code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
} }
if (state == TSDB_METER_STATE_READY) { if (code != TSDB_CODE_SUCCESS) {break;}
// meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
if (code != TSDB_CODE_SUCCESS) {break;}
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
}
numOfTotalPoints += numOfPoints; numOfTotalPoints += numOfPoints;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
......
...@@ -55,14 +55,11 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -55,14 +55,11 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
contLen += sizeof(SSubmitMsg); contLen += sizeof(SSubmitMsg);
int32_t numOfPoints = 0; int32_t numOfPoints = 0;
int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion,
&numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); if (code != TSDB_CODE_SUCCESS) {
if (state == TSDB_METER_STATE_READY) { dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId);
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid,
pObj->meterId, state);
} }
assert(numOfPoints >= 0 && numOfPoints <= 1); assert(numOfPoints >= 0 && numOfPoints <= 1);
......
...@@ -668,6 +668,26 @@ void vnodeSetMeterDeleting(SMeterObj* pMeterObj) { ...@@ -668,6 +668,26 @@ void vnodeSetMeterDeleting(SMeterObj* pMeterObj) {
pMeterObj->state |= TSDB_METER_STATE_DELETING; pMeterObj->state |= TSDB_METER_STATE_DELETING;
} }
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t state = vnodeSetMeterState(pObj, st);
if (state != TSDB_METER_STATE_READY) {//return to denote import is not performed
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, meter is deleted, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pObj->vnode, pObj->sid,
pObj->meterId, pObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
}
return code;
}
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
SMeterObj* pObj = pVnode->meterList[sid]; SMeterObj* pObj = pVnode->meterList[sid];
......
此差异已折叠。
...@@ -96,7 +96,6 @@ static SKeyword keywordTable[] = { ...@@ -96,7 +96,6 @@ static SKeyword keywordTable[] = {
{"TABLE", TK_TABLE}, {"TABLE", TK_TABLE},
{"DATABASE", TK_DATABASE}, {"DATABASE", TK_DATABASE},
{"DNODE", TK_DNODE}, {"DNODE", TK_DNODE},
{"IP", TK_IP},
{"USER", TK_USER}, {"USER", TK_USER},
{"ACCOUNT", TK_ACCOUNT}, {"ACCOUNT", TK_ACCOUNT},
{"USE", TK_USE}, {"USE", TK_USE},
...@@ -523,7 +522,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) { ...@@ -523,7 +522,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
} }
if (seg == 4) { // ip address if (seg == 4) { // ip address
*tokenType = TK_IP; *tokenType = TK_IPTOKEN;
return i; return i;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册