提交 4db1086e 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feature/query' into feature/query

...@@ -162,7 +162,7 @@ Master Vnode遵循下面的写入流程: ...@@ -162,7 +162,7 @@ Master Vnode遵循下面的写入流程:
<center> 图 3 TDengine Master写入流程 </center> <center> 图 3 TDengine Master写入流程 </center>
1. Master vnode收到应用的数据插入请求,验证OK,进入下一步; 1. Master vnode收到应用的数据插入请求,验证OK,进入下一步;
2. 如果系统配置参数walLevel打开(设置为2),vnode将把该请求的原始数据包写入数据库日志文件WAL,以保证TDengine能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失; 2. 如果系统配置参数walLevel大于0,vnode将把该请求的原始数据包写入数据库日志文件WAL。如果walLevel设置为2,而且fsync设置为0,TDengine还将WAL数据立即落盘,以保证即使宕机,也能从数据库日志文件中恢复数据,避免数据的丢失;
3. 如果有多个副本,vnode将把数据包转发给同一虚拟节点组内slave vnodes, 该转发包带有数据的版本号(version); 3. 如果有多个副本,vnode将把数据包转发给同一虚拟节点组内slave vnodes, 该转发包带有数据的版本号(version);
4. 写入内存,并加记录加入到skip list; 4. 写入内存,并加记录加入到skip list;
5. Master vnode返回确认信息给应用,表示写入成功。 5. Master vnode返回确认信息给应用,表示写入成功。
...@@ -174,7 +174,7 @@ Master Vnode遵循下面的写入流程: ...@@ -174,7 +174,7 @@ Master Vnode遵循下面的写入流程:
<center> 图 4 TDengine Slave写入流程 </center> <center> 图 4 TDengine Slave写入流程 </center>
1. Slave vnode收到Master vnode转发了的数据插入请求。 1. Slave vnode收到Master vnode转发了的数据插入请求。
2. 如果系统配置参数walLevl设置为2,vnode将把该请求的原始数据包写入日志(WAL) 2. 如果系统配置参数walLevel大于0,vnode将把该请求的原始数据包写入数据库日志文件WAL。如果walLevel设置为2,而且fsync设置为0,TDengine还将WAL数据立即落盘,以保证即使宕机,也能从数据库日志文件中恢复数据,避免数据的丢失
3. 写入内存,更新内存中的skip list。 3. 写入内存,更新内存中的skip list。
与Master vnode相比,slave vnode不存在转发环节,也不存在回复确认环节,少了两步。但写内存与WAL是完全一样的。 与Master vnode相比,slave vnode不存在转发环节,也不存在回复确认环节,少了两步。但写内存与WAL是完全一样的。
......
...@@ -399,7 +399,7 @@ int tsParseSql(SSqlObj *pSql, bool initial); ...@@ -399,7 +399,7 @@ int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
int tscRenewTableMeta(SSqlObj *pSql, char *tableId); int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscQueueAsyncRes(SSqlObj *pSql); void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param, int32_t code); void tscQueueAsyncError(void(*fp), void *param, int32_t code);
...@@ -414,7 +414,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); ...@@ -414,7 +414,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes); void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscResetSqlCmdObj(SSqlCmd *pCmd); void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
/** /**
* free query result of the sql object * free query result of the sql object
...@@ -456,6 +456,7 @@ bool tscResultsetFetchCompleted(TAOS_RES *result); ...@@ -456,6 +456,7 @@ bool tscResultsetFetchCompleted(TAOS_RES *result);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd); char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
......
...@@ -468,7 +468,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -468,7 +468,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql); tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false; pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd); tscResetSqlCmdObj(pCmd, false);
code = tsParseSql(pSql, true); code = tsParseSql(pSql, true);
......
...@@ -180,7 +180,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload, ...@@ -180,7 +180,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
} else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) { } else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
*(uint8_t *)payload = TSDB_DATA_BOOL_NULL; *(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
} else { } else {
return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z); return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
} }
} else if (pToken->type == TK_INTEGER) { } else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10); iv = strtoll(pToken->z, NULL, 10);
...@@ -439,8 +439,8 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ ...@@ -439,8 +439,8 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
int16_t type = sToken.type; int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) { type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
tscInvalidSQLErrMsg(error, "invalid data or symbol", sToken.z); tscSQLSyntaxErrMsg(error, "invalid data or symbol", sToken.z);
*code = TSDB_CODE_TSC_INVALID_SQL; *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; return -1;
} }
...@@ -472,7 +472,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ ...@@ -472,7 +472,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec); int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
*code = TSDB_CODE_TSC_INVALID_SQL; *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; // NOTE: here 0 mean error! return -1; // NOTE: here 0 mean error!
} }
...@@ -568,8 +568,8 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe ...@@ -568,8 +568,8 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
sToken = tStrGetToken(*str, &index, false, 0, NULL); sToken = tStrGetToken(*str, &index, false, 0, NULL);
*str += index; *str += index;
if (sToken.n == 0 || sToken.type != TK_RP) { if (sToken.n == 0 || sToken.type != TK_RP) {
tscInvalidSQLErrMsg(error, ") expected", *str); tscSQLSyntaxErrMsg(error, ") expected", *str);
*code = TSDB_CODE_TSC_INVALID_SQL; *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; return -1;
} }
...@@ -578,7 +578,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe ...@@ -578,7 +578,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
if (numOfRows <= 0) { if (numOfRows <= 0) {
strcpy(error, "no any data points"); strcpy(error, "no any data points");
*code = TSDB_CODE_TSC_INVALID_SQL; *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; return -1;
} else { } else {
return numOfRows; return numOfRows;
...@@ -943,7 +943,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -943,7 +943,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sToken = tStrGetToken(sql, &index, false, 0, NULL); sToken = tStrGetToken(sql, &index, false, 0, NULL);
sql += index; sql += index;
if (sToken.n == 0 || sToken.type != TK_RP) { if (sToken.n == 0 || sToken.type != TK_RP) {
return tscInvalidSQLErrMsg(pCmd->payload, ") expected", sToken.z); return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
} }
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen; pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen;
...@@ -1327,18 +1327,40 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1327,18 +1327,40 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
pSql->fetchFp = pSql->fp; pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert; pSql->fp = (void(*)())tscHandleMultivnodeInsert;
} }
if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) { if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
return ret; return ret;
} }
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql); ret = tsParseInsertSql(pSql);
if (sqlstr == NULL || pSql->retry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
free(sqlstr);
} else {
tscResetSqlCmdObj(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->retry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
ret = tsParseInsertSql(pSql);
}
}
} else { } else {
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr); SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->retry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
tscResetSqlCmdObj(pCmd, true);
pSql->retry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
}
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
} }
if (ret == TSDB_CODE_SUCCESS) {
pSql->retry = 0;
}
/* /*
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function, * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
* so do NOT use pRes->code to determine if the getTableMeta function * so do NOT use pRes->code to determine if the getTableMeta function
......
...@@ -190,7 +190,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -190,7 +190,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!pInfo->valid) { if (!pInfo->valid) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), pInfo->pzErrMsg); return tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), NULL, pInfo->pzErrMsg);
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
......
...@@ -276,8 +276,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -276,8 +276,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
} }
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
int32_t cmd = pCmd->command; int32_t cmd = pCmd->command;
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
...@@ -302,7 +300,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -302,7 +300,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosMsleep(duration); taosMsleep(duration);
} }
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); rpcMsg->code = tscRenewTableMeta(pSql, 0);
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
...@@ -2202,14 +2200,14 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create ...@@ -2202,14 +2200,14 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
/** /**
* retrieve table meta from mnode, and update the local table meta cache. * retrieve table meta from mnode, and update the local table meta cache.
* @param pSql sql object * @param pSql sql object
* @param tableId table full name * @param tableIndex table index
* @return status code * @return status code
*/ */
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) { int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
......
...@@ -597,11 +597,12 @@ int taos_errno(TAOS_RES *tres) { ...@@ -597,11 +597,12 @@ int taos_errno(TAOS_RES *tres) {
} }
/* /*
* In case of invalid sql error, additional information is attached to explain * In case of invalid sql/sql syntax error, additional information is attached to explain
* why the sql is invalid * why the sql is invalid
*/ */
static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
if (code != TSDB_CODE_TSC_INVALID_SQL) { if (code != TSDB_CODE_TSC_INVALID_SQL
&& code != TSDB_CODE_TSC_SQL_SYNTAX_ERROR) {
return false; return false;
} }
...@@ -609,9 +610,11 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { ...@@ -609,9 +610,11 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
char *z = NULL; char *z = NULL;
if (len > 0) { if (len > 0) {
z = strstr(pCmd->payload, "invalid SQL"); z = strstr(pCmd->payload, "invalid SQL");
if (z == NULL) {
z = strstr(pCmd->payload, "syntax error");
}
} }
return z != NULL; return z != NULL;
} }
...@@ -817,7 +820,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -817,7 +820,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) { static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object // must before clean the sqlcmd object
tscResetSqlCmdObj(&pSql->cmd); tscResetSqlCmdObj(&pSql->cmd, false);
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) { SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->pCond == NULL) { if (pTagCond->pCond == NULL) {
return NULL; return NULL;
} }
...@@ -294,7 +294,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -294,7 +294,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
} }
static void tscFreeQueryInfo(SSqlCmd* pCmd) { static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
if (pCmd == NULL || pCmd->numOfClause == 0) { if (pCmd == NULL || pCmd->numOfClause == 0) {
return; return;
} }
...@@ -304,7 +304,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) { ...@@ -304,7 +304,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
freeQueryInfoImpl(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); clearAllTableMetaInfo(pQueryInfo, (const char*)addr, removeFromCache);
taosTFree(pQueryInfo); taosTFree(pQueryInfo);
} }
...@@ -312,7 +312,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) { ...@@ -312,7 +312,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) {
taosTFree(pCmd->pQueryInfo); taosTFree(pCmd->pQueryInfo);
} }
void tscResetSqlCmdObj(SSqlCmd* pCmd) { void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->command = 0; pCmd->command = 0;
pCmd->numOfCols = 0; pCmd->numOfCols = 0;
pCmd->count = 0; pCmd->count = 0;
...@@ -326,7 +326,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) { ...@@ -326,7 +326,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd); tscFreeQueryInfo(pCmd, removeFromCache);
} }
void tscFreeSqlResult(SSqlObj* pSql) { void tscFreeSqlResult(SSqlObj* pSql) {
...@@ -364,7 +364,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { ...@@ -364,7 +364,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->numOfSubs = 0; pSql->numOfSubs = 0;
tscResetSqlCmdObj(pCmd); tscResetSqlCmdObj(pCmd, false);
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
...@@ -2040,10 +2040,37 @@ bool tscIsUpdateQuery(SSqlObj* pSql) { ...@@ -2040,10 +2040,37 @@ bool tscIsUpdateQuery(SSqlObj* pSql) {
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command); return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command);
} }
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql) {
const char* msgFormat1 = "syntax error near \'%s\'";
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
const char* msgFormat3 = "%s";
const char* prefix = "syntax error";
const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) {
assert(additionalInfo != NULL);
sprintf(msg, msgFormat1, additionalInfo);
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (additionalInfo != NULL) {
sprintf(msg, msgFormat2, buf, additionalInfo);
} else {
const char* msgFormat = (0 == strncmp(sql, prefix, strlen(prefix))) ? msgFormat3 : msgFormat1;
sprintf(msg, msgFormat, buf);
}
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
const char* msgFormat1 = "invalid SQL: %s"; const char* msgFormat1 = "invalid SQL: %s";
const char* msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)"; const char* msgFormat2 = "invalid SQL: \'%s\' (%s)";
const char* msgFormat3 = "invalid SQL: syntax error near \"%s\""; const char* msgFormat3 = "invalid SQL: \'%s\'";
const int32_t BACKWARD_CHAR_STEP = 0; const int32_t BACKWARD_CHAR_STEP = 0;
...@@ -2269,4 +2296,4 @@ bool tscSetSqlOwner(SSqlObj* pSql) { ...@@ -2269,4 +2296,4 @@ bool tscSetSqlOwner(SSqlObj* pSql) {
void tscClearSqlOwner(SSqlObj* pSql) { void tscClearSqlOwner(SSqlObj* pSql) {
assert(taosCheckPthreadValid(pSql->owner)); assert(taosCheckPthreadValid(pSql->owner));
atomic_store_64(&pSql->owner, 0); atomic_store_64(&pSql->owner, 0);
} }
\ No newline at end of file
...@@ -98,6 +98,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in ...@@ -98,6 +98,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax errr in SQL")
// mnode // mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed")
......
...@@ -38,9 +38,9 @@ static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) ...@@ -38,9 +38,9 @@ static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i); SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i);
tFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); tFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
memcpy(buffer->data + offset, pg->data, pg->num * pMemBucket->bytes); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += pg->num * pMemBucket->bytes; offset += (int32_t)(pg->num * pMemBucket->bytes);
} }
qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn); qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn);
...@@ -142,7 +142,7 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { ...@@ -142,7 +142,7 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
index = delta % pBucket->numOfSlots; index = delta % pBucket->numOfSlots;
} else { } else {
double slotSpan = (double)span / pBucket->numOfSlots; double slotSpan = (double)span / pBucket->numOfSlots;
index = (v - pBucket->range.i64MinVal) / slotSpan; index = (int32_t)((v - pBucket->range.i64MinVal) / slotSpan);
if (v == pBucket->range.i64MaxVal) { if (v == pBucket->range.i64MaxVal) {
index -= 1; index -= 1;
} }
...@@ -186,7 +186,7 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { ...@@ -186,7 +186,7 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
index = (delta % pBucket->numOfSlots); index = (delta % pBucket->numOfSlots);
} else { } else {
double slotSpan = (double)span / pBucket->numOfSlots; double slotSpan = (double)span / pBucket->numOfSlots;
index = (v - pBucket->range.iMinVal) / slotSpan; index = (int32_t)((v - pBucket->range.iMinVal) / slotSpan);
if (v == pBucket->range.iMaxVal) { if (v == pBucket->range.iMaxVal) {
index -= 1; index -= 1;
} }
...@@ -216,7 +216,7 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { ...@@ -216,7 +216,7 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
index = (delta % pBucket->numOfSlots); index = (delta % pBucket->numOfSlots);
} else { } else {
double slotSpan = span / pBucket->numOfSlots; double slotSpan = span / pBucket->numOfSlots;
index = (v - pBucket->range.dMinVal) / slotSpan; index = (int32_t)((v - pBucket->range.dMinVal) / slotSpan);
if (v == pBucket->range.dMaxVal) { if (v == pBucket->range.dMaxVal) {
index -= 1; index -= 1;
} }
...@@ -397,7 +397,7 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { ...@@ -397,7 +397,7 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
*/ */
void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
assert(pBucket != NULL && data != NULL && size > 0); assert(pBucket != NULL && data != NULL && size > 0);
pBucket->total += size; pBucket->total += (int32_t)size;
int32_t bytes = pBucket->bytes; int32_t bytes = pBucket->bytes;
...@@ -530,7 +530,7 @@ static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) { ...@@ -530,7 +530,7 @@ static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) {
}; };
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
finalResult = pSlot->range.i64MinVal; finalResult = (double)pSlot->range.i64MinVal;
break; break;
} }
} }
......
...@@ -693,7 +693,7 @@ class DbConnRest(DbConn): ...@@ -693,7 +693,7 @@ class DbConnRest(DbConn):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._type = self.TYPE_REST self._type = self.TYPE_REST
self._url = "http://localhost:6020/rest/sql" # fixed for now self._url = "http://localhost:6041/rest/sql" # fixed for now
self._result = None self._result = None
def openByType(self): # Open connection def openByType(self): # Open connection
...@@ -1306,6 +1306,7 @@ class DbManager(): ...@@ -1306,6 +1306,7 @@ class DbManager():
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2) sys.exit(2)
else: else:
print("Failed to connect to DB, errno = {}, msg: {}".format(Helper.convertErrno(err.errno), err.msg))
raise raise
except BaseException: except BaseException:
print("[=] Unexpected exception") print("[=] Unexpected exception")
...@@ -1910,10 +1911,19 @@ class TaskReadData(StateTransitionTask): ...@@ -1910,10 +1911,19 @@ class TaskReadData(StateTransitionTask):
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)', 'sum(speed)',
'stddev(speed)', 'stddev(speed)',
# SELECTOR functions
'min(speed)', 'min(speed)',
'max(speed)', 'max(speed)',
'first(speed)', 'first(speed)',
'last(speed)']) # TODO: add more from 'top' 'last(speed)',
# 'top(speed)', # TODO: not supported?
# 'bottom(speed)', # TODO: not supported?
# 'percentile(speed, 10)', # TODO: TD-1316
'last_row(speed)',
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
'spread(speed)'
]) # TODO: add more from 'top'
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
None None
]) ])
...@@ -2350,7 +2360,7 @@ class ServiceManagerThread: ...@@ -2350,7 +2360,7 @@ class ServiceManagerThread:
self._thread2.start() self._thread2.start()
# wait for service to start # wait for service to start
for i in range(0, 10): for i in range(0, 100):
time.sleep(1.0) time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up # self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True) print("_zz_", end="", flush=True)
...@@ -2358,7 +2368,7 @@ class ServiceManagerThread: ...@@ -2358,7 +2368,7 @@ class ServiceManagerThread:
logger.info("[] TDengine service READY to process requests") logger.info("[] TDengine service READY to process requests")
return # now we've started return # now we've started
# TODO: handle this better? # TODO: handle this better?
self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully") raise RuntimeError("TDengine service did not start successfully")
def stop(self): def stop(self):
...@@ -2768,7 +2778,7 @@ class MainExec: ...@@ -2768,7 +2778,7 @@ class MainExec:
try: try:
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
except requests.exceptions.ConnectionError as err: except requests.exceptions.ConnectionError as err:
logger.warning("Failed to open REST connection to DB") logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
# don't raise # don't raise
return ret return ret
......
...@@ -119,7 +119,7 @@ endi ...@@ -119,7 +119,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql
print 17-> $system_content print 17-> $system_content
if $system_content != @{"status":"error","code":512,"desc":"invalid SQL: invalid SQL: syntax error near 'used1'"}@ then if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then
return -1 return -1
endi endi
...@@ -230,4 +230,4 @@ if $system_content != @{"status":"succ","head":["ts","speed"],"data":[["2017-12- ...@@ -230,4 +230,4 @@ if $system_content != @{"status":"succ","head":["ts","speed"],"data":[["2017-12-
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册