提交 3ecbc860 编写于 作者: C Cary Xu

Merge branch 'develop' into feature/TD-4666

...@@ -2799,8 +2799,7 @@ static bool isTablenameToken(SStrToken* token) { ...@@ -2799,8 +2799,7 @@ static bool isTablenameToken(SStrToken* token) {
SStrToken tableToken = {0}; SStrToken tableToken = {0};
extractTableNameFromToken(&tmpToken, &tableToken); extractTableNameFromToken(&tmpToken, &tableToken);
return (tmpToken.n == strlen(TSQL_TBNAME_L) && strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0);
return (strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_TBNAME_L));
} }
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) { static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
...@@ -2831,8 +2830,11 @@ int32_t doGetColumnIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColum ...@@ -2831,8 +2830,11 @@ int32_t doGetColumnIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColum
if (isTablenameToken(pToken)) { if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX; pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) { } else if (strlen(DEFAULT_PRIMARY_TIMESTAMP_COL_NAME) == pToken->n &&
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX; strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX; // just make runtime happy, need fix java test case InsertSpecialCharacterJniTest
} else if (pToken->n == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX; // just make runtime happy, need fix java test case InsertSpecialCharacterJniTest
} else { } else {
// not specify the table name, try to locate the table index by column name // not specify the table name, try to locate the table index by column name
if (pIndex->tableIndex == COLUMN_INDEX_INITIAL_VAL) { if (pIndex->tableIndex == COLUMN_INDEX_INITIAL_VAL) {
...@@ -3397,7 +3399,14 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, ...@@ -3397,7 +3399,14 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
if (IS_NUMERIC_TYPE(pRight->value.nType)) { if (IS_NUMERIC_TYPE(pRight->value.nType)) {
bufLen = 60; bufLen = 60;
} else { } else {
bufLen = pRight->value.nLen + 1; /*
* make memory sanitizer happy;
*/
if (pRight->value.nLen == 0) {
bufLen = pRight->value.nLen + 2;
} else {
bufLen = pRight->value.nLen + 1;
}
} }
if (pExpr->tokenId == TK_LE || pExpr->tokenId == TK_LT) { if (pExpr->tokenId == TK_LE || pExpr->tokenId == TK_LT) {
...@@ -4836,7 +4845,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq ...@@ -4836,7 +4845,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
int32_t type = 0; int32_t type = 0;
if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, &type, (*pExpr)->tokenId)) != TSDB_CODE_SUCCESS) { if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, &type, (*pExpr)->tokenId)) != TSDB_CODE_SUCCESS) {
return ret; goto PARSE_WHERE_EXIT;
} }
tSqlExprCompact(pExpr); tSqlExprCompact(pExpr);
...@@ -4846,7 +4855,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq ...@@ -4846,7 +4855,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
// 1. check if it is a join query // 1. check if it is a join query
if ((ret = validateJoinExpr(&pSql->cmd, pQueryInfo, &condExpr)) != TSDB_CODE_SUCCESS) { if ((ret = validateJoinExpr(&pSql->cmd, pQueryInfo, &condExpr)) != TSDB_CODE_SUCCESS) {
return ret; goto PARSE_WHERE_EXIT;
} }
// 2. get the query time range // 2. get the query time range
...@@ -5047,7 +5056,8 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo ...@@ -5047,7 +5056,8 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
size_t numOfFields = tscNumOfFields(pQueryInfo); size_t numOfFields = tscNumOfFields(pQueryInfo);
if (pQueryInfo->fillVal == NULL) { if (pQueryInfo->fillVal == NULL) {
pQueryInfo->fillVal = calloc(numOfFields, sizeof(int64_t)); pQueryInfo->fillVal = calloc(numOfFields, sizeof(int64_t));
pQueryInfo->numOfFillVal = (int32_t)numOfFields;
if (pQueryInfo->fillVal == NULL) { if (pQueryInfo->fillVal == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -7733,11 +7743,18 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -7733,11 +7743,18 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
taosArrayPush(pVgroupList, &t); taosArrayPush(pVgroupList, &t);
} }
STableMeta* pMeta = tscTableMetaDup(pTableMeta); //STableMeta* pMeta = tscTableMetaDup(pTableMeta);
STableMetaVgroupInfo p = { .pTableMeta = pMeta }; //STableMetaVgroupInfo p = { .pTableMeta = pMeta };
//const char* px = tNameGetTableName(pname);
//taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
// avoid mem leak, may should update pTableMeta
const char* px = tNameGetTableName(pname); const char* px = tNameGetTableName(pname);
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo)); if (taosHashGet(pCmd->pTableMetaMap, px, strlen(px)) == NULL) {
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
STableMetaVgroupInfo p = { .pTableMeta = pMeta, .pVgroupInfo = NULL};
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
}
} else { // add to the retrieve table meta array list. } else { // add to the retrieve table meta array list.
char* t = strdup(name); char* t = strdup(name);
taosArrayPush(plist, &t); taosArrayPush(plist, &t);
...@@ -8193,7 +8210,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8193,7 +8210,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// in case of join query, time range is required. // in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); uint64_t timeRange = (uint64_t)pQueryInfo->window.ekey - pQueryInfo->window.skey;
if (timeRange == 0 && pQueryInfo->window.skey == 0) { if (timeRange == 0 && pQueryInfo->window.skey == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
...@@ -8235,6 +8252,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8235,6 +8252,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
taosArrayAddBatch(pQueryInfo->exprList1, (void*) p, numOfExpr); taosArrayAddBatch(pQueryInfo->exprList1, (void*) p, numOfExpr);
tfree(p);
} }
#if 0 #if 0
......
...@@ -13,7 +13,10 @@ ...@@ -13,7 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <tscompression.h>
#include "os.h" #include "os.h"
#include "qPlan.h"
#include "qTableMeta.h"
#include "tcmdtype.h" #include "tcmdtype.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "trpc.h" #include "trpc.h"
...@@ -21,10 +24,8 @@ ...@@ -21,10 +24,8 @@
#include "tscLog.h" #include "tscLog.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "qTableMeta.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttimer.h" #include "ttimer.h"
#include "qPlan.h"
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
...@@ -2048,24 +2049,38 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2048,24 +2049,38 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
} }
SSqlCmd *pParentCmd = &pParentSql->cmd; SSqlCmd *pParentCmd = &pParentSql->cmd;
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
char* pMsg = pMultiMeta->meta; char* pMsg = pMultiMeta->meta;
char* buf = NULL;
if (pMultiMeta->compressed) {
buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta));
int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
buf, pMultiMeta->rawLen - sizeof(SMultiTableMeta), ONE_STAGE_COMP, NULL, 0);
assert(len == pMultiMeta->rawLen - sizeof(SMultiTableMeta));
pMsg = buf;
}
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg; STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg;
int32_t code = tableMetaMsgConvert(pMetaMsg); int32_t code = tableMetaMsgConvert(pMetaMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosHashCleanup(pSet); taosHashCleanup(pSet);
taosReleaseRef(tscObjRef, pParentSql->self); taosReleaseRef(tscObjRef, pParentSql->self);
tfree(buf);
return code; return code;
} }
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg); STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) { if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname); tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname);
tfree(pTableMeta);
taosHashCleanup(pSet); taosHashCleanup(pSet);
taosReleaseRef(tscObjRef, pParentSql->self); taosReleaseRef(tscObjRef, pParentSql->self);
tfree(buf);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
...@@ -2105,6 +2120,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2105,6 +2120,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
assert(p != NULL); assert(p != NULL);
int32_t size = 0; int32_t size = 0;
if (p->pVgroupInfo!= NULL) {
tscVgroupInfoClear(p->pVgroupInfo);
//tfree(p->pTableMeta);
}
p->pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self); p->pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
pMsg += size; pMsg += size;
} }
...@@ -2115,6 +2134,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2115,6 +2134,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
taosHashCleanup(pSet); taosHashCleanup(pSet);
taosReleaseRef(tscObjRef, pParentSql->self); taosReleaseRef(tscObjRef, pParentSql->self);
tfree(buf);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -948,8 +948,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -948,8 +948,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
pSql->fp = NULL; // todo set the correct callback function pointer
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t length = (int32_t)strlen(tableNameList); int32_t length = (int32_t)strlen(tableNameList);
......
...@@ -107,6 +107,9 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -107,6 +107,9 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
subState->states[idx] = 1; subState->states[idx] = 1;
bool done = allSubqueryDone(pParentSql); bool done = allSubqueryDone(pParentSql);
if (!done) {
tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub);
}
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
return done; return done;
} }
...@@ -416,7 +419,9 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { ...@@ -416,7 +419,9 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
} }
// tscFieldInfoClear(&pSupporter->fieldsInfo); // tscFieldInfoClear(&pSupporter->fieldsInfo);
if (pSupporter->fieldsInfo.internalField != NULL) {
taosArrayDestroy(pSupporter->fieldsInfo.internalField);
}
if (pSupporter->pTSBuf != NULL) { if (pSupporter->pTSBuf != NULL) {
tsBufDestroy(pSupporter->pTSBuf); tsBufDestroy(pSupporter->pTSBuf);
pSupporter->pTSBuf = NULL; pSupporter->pTSBuf = NULL;
...@@ -430,7 +435,8 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { ...@@ -430,7 +435,8 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
} }
if (pSupporter->pVgroupTables != NULL) { if (pSupporter->pVgroupTables != NULL) {
taosArrayDestroy(pSupporter->pVgroupTables); //taosArrayDestroy(pSupporter->pVgroupTables);
tscFreeVgroupTableInfo(pSupporter->pVgroupTables);
pSupporter->pVgroupTables = NULL; pSupporter->pVgroupTables = NULL;
} }
...@@ -889,7 +895,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -889,7 +895,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
tscDebug("Join %d - num:%d", i, p->num); tscDebug("Join %d - num:%d", i, p->num);
// sort according to the tag valu // sort according to the tag valu
qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar); if (p->pIdTagList != NULL) {
qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
}
if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) { if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
for (int32_t j = 0; j <= i; j++) { for (int32_t j = 0; j <= i; j++) {
...@@ -1173,7 +1181,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1173,7 +1181,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// no data exists in next vnode, mark the <tid, tags> query completed // no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets. // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); //tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return; return;
} }
...@@ -1441,7 +1449,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1441,7 +1449,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub); //tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub);
return; return;
} }
...@@ -3047,9 +3055,10 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3047,9 +3055,10 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->cmd.insertParam.schemaAttached = 1; pParentObj->cmd.insertParam.schemaAttached = 1;
} }
} }
if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) { if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, pSupporter->index, pParentObj->subState.numOfSub); // concurrency problem, other thread already release pParentObj
//tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, suppIdx, pParentObj->subState.numOfSub);
return; return;
} }
......
...@@ -297,7 +297,7 @@ bool tscHasColumnFilter(SQueryInfo* pQueryInfo) { ...@@ -297,7 +297,7 @@ bool tscHasColumnFilter(SQueryInfo* pQueryInfo) {
size_t size = taosArrayGetSize(pQueryInfo->colList); size_t size = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SColumn* pCol = taosArrayGet(pQueryInfo->colList, i); SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (pCol->info.flist.numOfFilters > 0) { if (pCol->info.flist.numOfFilters > 0) {
return true; return true;
} }
...@@ -3096,6 +3096,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3096,6 +3096,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->tsBuf = NULL; pQueryInfo->tsBuf = NULL;
pQueryInfo->fillType = pSrc->fillType; pQueryInfo->fillType = pSrc->fillType;
pQueryInfo->fillVal = NULL; pQueryInfo->fillVal = NULL;
pQueryInfo->numOfFillVal = 0;;
pQueryInfo->clauseLimit = pSrc->clauseLimit; pQueryInfo->clauseLimit = pSrc->clauseLimit;
pQueryInfo->prjOffset = pSrc->prjOffset; pQueryInfo->prjOffset = pSrc->prjOffset;
pQueryInfo->numOfTables = 0; pQueryInfo->numOfTables = 0;
...@@ -3131,11 +3132,12 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3131,11 +3132,12 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
} }
if (pSrc->fillType != TSDB_FILL_NONE) { if (pSrc->fillType != TSDB_FILL_NONE) {
pQueryInfo->fillVal = malloc(pSrc->fieldsInfo.numOfOutput * sizeof(int64_t)); pQueryInfo->fillVal = calloc(1, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
if (pQueryInfo->fillVal == NULL) { if (pQueryInfo->fillVal == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pQueryInfo->numOfFillVal = pSrc->fieldsInfo.numOfOutput;
memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
} }
...@@ -3477,6 +3479,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3477,6 +3479,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->tsBuf = NULL; pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->fillType = pQueryInfo->fillType; pNewQueryInfo->fillType = pQueryInfo->fillType;
pNewQueryInfo->fillVal = NULL; pNewQueryInfo->fillVal = NULL;
pNewQueryInfo->numOfFillVal = 0;
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
pNewQueryInfo->prjOffset = pQueryInfo->prjOffset; pNewQueryInfo->prjOffset = pQueryInfo->prjOffset;
pNewQueryInfo->numOfTables = 0; pNewQueryInfo->numOfTables = 0;
...@@ -3507,11 +3510,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3507,11 +3510,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
} }
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); //just make memory memory sanitizer happy
//refator later
pNewQueryInfo->fillVal = calloc(1, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
if (pNewQueryInfo->fillVal == NULL) { if (pNewQueryInfo->fillVal == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pNewQueryInfo->numOfFillVal = pQueryInfo->fieldsInfo.numOfOutput;
memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
} }
...@@ -4531,7 +4537,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4531,7 +4537,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
if (pQueryAttr->fillType != TSDB_FILL_NONE) { if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t)); pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t));
memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryAttr->numOfOutput * sizeof(int64_t)); memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryInfo->numOfFillVal * sizeof(int64_t));
} }
pQueryAttr->srcRowSize = 0; pQueryAttr->srcRowSize = 0;
......
...@@ -18,7 +18,58 @@ ...@@ -18,7 +18,58 @@
#include "ttype.h" #include "ttype.h"
#include "tutil.h" #include "tutil.h"
#include "tarithoperator.h" #include "tarithoperator.h"
#include "tcompare.h"
//GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i]));
#define ARRAY_LIST_OP_DIV(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \
{ \
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : MAX(len1, len2) - 1; \
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; \
\
if ((len1) == (len2)) { \
for (; i < (len2) && i >= 0; i += step, (out) += 1) { \
if (isNull((char *)&((left)[i]), _left_type) || isNull((char *)&((right)[i]), _right_type)) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] op(right)[i]; \
} \
} else if ((len1) == 1) { \
for (; i >= 0 && i < (len2); i += step, (out) += 1) { \
if (isNull((char *)(left), _left_type) || isNull((char *)&(right)[i], _right_type)) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[0] op(right)[i]; \
} \
} else if ((len2) == 1) { \
for (; i >= 0 && i < (len1); i += step, (out) += 1) { \
if (isNull((char *)&(left)[i], _left_type) || isNull((char *)(right), _right_type)) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[0])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] op(right)[0]; \
} \
} \
}
#define ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \ #define ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \
{ \ { \
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : MAX(len1, len2) - 1; \ int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : MAX(len1, len2) - 1; \
...@@ -62,6 +113,12 @@ ...@@ -62,6 +113,12 @@
SET_DOUBLE_NULL(out); \ SET_DOUBLE_NULL(out); \
continue; \ continue; \
} \ } \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[i])) * (right)[i]; \ *(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[i])) * (right)[i]; \
} \ } \
} else if (len1 == 1) { \ } else if (len1 == 1) { \
...@@ -70,6 +127,12 @@ ...@@ -70,6 +127,12 @@
SET_DOUBLE_NULL(out); \ SET_DOUBLE_NULL(out); \
continue; \ continue; \
} \ } \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[0] - ((int64_t)(((double)(left)[0]) / (right)[i])) * (right)[i]; \ *(out) = (double)(left)[0] - ((int64_t)(((double)(left)[0]) / (right)[i])) * (right)[i]; \
} \ } \
} else if ((len2) == 1) { \ } else if ((len2) == 1) { \
...@@ -78,6 +141,12 @@ ...@@ -78,6 +141,12 @@
SET_DOUBLE_NULL(out); \ SET_DOUBLE_NULL(out); \
continue; \ continue; \
} \ } \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[0])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[0])) * (right)[0]; \ *(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[0])) * (right)[0]; \
} \ } \
} \ } \
...@@ -90,7 +159,7 @@ ...@@ -90,7 +159,7 @@
#define ARRAY_LIST_MULTI(left, right, _left_type, _right_type, len1, len2, out, _ord) \ #define ARRAY_LIST_MULTI(left, right, _left_type, _right_type, len1, len2, out, _ord) \
ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, *, TSDB_DATA_TYPE_DOUBLE, _ord) ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, *, TSDB_DATA_TYPE_DOUBLE, _ord)
#define ARRAY_LIST_DIV(left, right, _left_type, _right_type, len1, len2, out, _ord) \ #define ARRAY_LIST_DIV(left, right, _left_type, _right_type, len1, len2, out, _ord) \
ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, /, TSDB_DATA_TYPE_DOUBLE, _ord) ARRAY_LIST_OP_DIV(left, right, _left_type, _right_type, len1, len2, out, /, TSDB_DATA_TYPE_DOUBLE, _ord)
#define ARRAY_LIST_REM(left, right, _left_type, _right_type, len1, len2, out, _ord) \ #define ARRAY_LIST_REM(left, right, _left_type, _right_type, len1, len2, out, _ord) \
ARRAY_LIST_OP_REM(left, right, _left_type, _right_type, len1, len2, out, %, TSDB_DATA_TYPE_DOUBLE, _ord) ARRAY_LIST_OP_REM(left, right, _left_type, _right_type, len1, len2, out, %, TSDB_DATA_TYPE_DOUBLE, _ord)
......
此差异已折叠。
...@@ -2,6 +2,7 @@ package com.taosdata.jdbc; ...@@ -2,6 +2,7 @@ package com.taosdata.jdbc;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
public abstract class AbstractStatement extends WrapperImpl implements Statement { public abstract class AbstractStatement extends WrapperImpl implements Statement {
...@@ -196,13 +197,44 @@ public abstract class AbstractStatement extends WrapperImpl implements Statement ...@@ -196,13 +197,44 @@ public abstract class AbstractStatement extends WrapperImpl implements Statement
if (batchedArgs == null || batchedArgs.isEmpty()) if (batchedArgs == null || batchedArgs.isEmpty())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_BATCH_IS_EMPTY); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_BATCH_IS_EMPTY);
String clientInfo = getConnection().getClientInfo(TSDBDriver.PROPERTY_KEY_BATCH_ERROR_IGNORE);
boolean batchErrorIgnore = clientInfo == null ? TSDBConstants.DEFAULT_BATCH_ERROR_IGNORE : Boolean.parseBoolean(clientInfo);
if (batchErrorIgnore) {
return executeBatchIgnoreException();
}
return executeBatchThrowException();
}
private int[] executeBatchIgnoreException() {
return batchedArgs.stream().mapToInt(sql -> {
try {
boolean isSelect = execute(sql);
if (isSelect) {
return SUCCESS_NO_INFO;
} else {
return getUpdateCount();
}
} catch (SQLException e) {
return EXECUTE_FAILED;
}
}).toArray();
}
private int[] executeBatchThrowException() throws BatchUpdateException {
int[] res = new int[batchedArgs.size()]; int[] res = new int[batchedArgs.size()];
for (int i = 0; i < batchedArgs.size(); i++) { for (int i = 0; i < batchedArgs.size(); i++) {
boolean isSelect = execute(batchedArgs.get(i)); try {
if (isSelect) { boolean isSelect = execute(batchedArgs.get(i));
res[i] = SUCCESS_NO_INFO; if (isSelect) {
} else { res[i] = SUCCESS_NO_INFO;
res[i] = getUpdateCount(); } else {
res[i] = getUpdateCount();
}
} catch (SQLException e) {
String reason = e.getMessage();
int[] updateCounts = Arrays.copyOfRange(res, 0, i);
throw new BatchUpdateException(reason, updateCounts, e);
} }
} }
return res; return res;
......
...@@ -74,6 +74,8 @@ public abstract class TSDBConstants { ...@@ -74,6 +74,8 @@ public abstract class TSDBConstants {
public static final String DEFAULT_PRECISION = "ms"; public static final String DEFAULT_PRECISION = "ms";
public static final boolean DEFAULT_BATCH_ERROR_IGNORE = false;
public static int typeName2JdbcType(String type) { public static int typeName2JdbcType(String type) {
switch (type.toUpperCase()) { switch (type.toUpperCase()) {
case "TIMESTAMP": case "TIMESTAMP":
......
...@@ -100,6 +100,11 @@ public class TSDBDriver extends AbstractDriver { ...@@ -100,6 +100,11 @@ public class TSDBDriver extends AbstractDriver {
*/ */
public static final String PROPERTY_KEY_TIMESTAMP_FORMAT = "timestampFormat"; public static final String PROPERTY_KEY_TIMESTAMP_FORMAT = "timestampFormat";
/**
* continue process commands in executeBatch
*/
public static final String PROPERTY_KEY_BATCH_ERROR_IGNORE = "batchErrorIgnore";
private TSDBDatabaseMetaData dbMetaData = null; private TSDBDatabaseMetaData dbMetaData = null;
static { static {
......
...@@ -841,13 +841,13 @@ public class TSDBPreparedStatementTest { ...@@ -841,13 +841,13 @@ public class TSDBPreparedStatementTest {
} }
@Test @Test
public void setBytes() throws SQLException, IOException { public void setBytes() throws SQLException {
// given // given
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
byte[] f8 = "{\"name\": \"john\", \"age\": 10, \"address\": \"192.168.1.100\"}".getBytes(); byte[] f8 = "{\"name\": \"john\", \"age\": 10, \"address\": \"192.168.1.100\"}".getBytes();
// when // when
pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis())); pstmt_insert.setTimestamp(1, new Timestamp(ts));
pstmt_insert.setBytes(9, f8); pstmt_insert.setBytes(9, f8);
int result = pstmt_insert.executeUpdate(); int result = pstmt_insert.executeUpdate();
......
package com.taosdata.jdbc.cases;
import org.junit.*;
import java.sql.*;
import java.util.stream.IntStream;
public class BatchErrorIgnoreTest {
private static final String host = "127.0.0.1";
@Test
public void batchErrorThrowException() throws SQLException {
// given
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
// when
try (Statement stmt = conn.createStatement()) {
IntStream.range(1, 6).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("insert into t11 values(now, 11)");
IntStream.range(6, 11).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + "),(now + 1s, " + (10 * i) + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("select count(*) from test.weather");
stmt.executeBatch();
} catch (BatchUpdateException e) {
int[] updateCounts = e.getUpdateCounts();
Assert.assertEquals(5, updateCounts.length);
Assert.assertEquals(1, updateCounts[0]);
Assert.assertEquals(1, updateCounts[1]);
Assert.assertEquals(1, updateCounts[2]);
Assert.assertEquals(1, updateCounts[3]);
Assert.assertEquals(1, updateCounts[4]);
}
}
@Test
public void batchErrorIgnore() throws SQLException {
// given
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata&batchErrorIgnore=true");
// when
int[] results = null;
try (Statement stmt = conn.createStatement()) {
IntStream.range(1, 6).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("insert into t11 values(now, 11)");
IntStream.range(6, 11).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + "),(now + 1s, " + (10 * i) + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("select count(*) from test.weather");
results = stmt.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
}
// then
assert results != null;
Assert.assertEquals(12, results.length);
Assert.assertEquals(1, results[0]);
Assert.assertEquals(1, results[1]);
Assert.assertEquals(1, results[2]);
Assert.assertEquals(1, results[3]);
Assert.assertEquals(1, results[4]);
Assert.assertEquals(Statement.EXECUTE_FAILED, results[5]);
Assert.assertEquals(2, results[6]);
Assert.assertEquals(2, results[7]);
Assert.assertEquals(2, results[8]);
Assert.assertEquals(2, results[9]);
Assert.assertEquals(2, results[10]);
Assert.assertEquals(Statement.SUCCESS_NO_INFO, results[11]);
}
@Before
public void before() {
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("use test");
stmt.execute("drop table if exists weather");
stmt.execute("create table weather (ts timestamp, f1 float) tags(t1 int)");
IntStream.range(1, 11).mapToObj(i -> "create table t" + i + " using weather tags(" + i + ")").forEach(sql -> {
try {
stmt.execute(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@BeforeClass
public static void beforeClass() {
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test");
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@AfterClass
public static void afterClass() {
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
...@@ -202,12 +202,12 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -202,12 +202,12 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite, dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version); pWrite->rpcMsg.ahandle, taosMsg[pWrite->walHead.msgType], qtypeStr[qtype], pWrite->walHead.version);
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite); pWrite->code = vnodeProcessWrite(pVnode, &pWrite->walHead, qtype, pWrite);
if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1); if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1);
if (pWrite->code > 0) pWrite->code = 0; if (pWrite->code > 0) pWrite->code = 0;
if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; if (pWrite->code == 0 && pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code); dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code);
} }
...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) { if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); vnodeConfirmForward(pVnode, pWrite->walHead.version, pWrite->code, pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT);
} }
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
......
...@@ -395,6 +395,8 @@ int32_t* taosGetErrno(); ...@@ -395,6 +395,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find") #define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find")
#define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string") #define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string")
#define TSDB_CODE_HTTP_REQUEST_JSON_ERROR TAOS_DEF_ERROR_CODE(0, 0x1F00) //"http request json error")
// odbc // odbc
#define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory") #define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory")
#define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input") #define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input")
......
...@@ -760,10 +760,12 @@ typedef struct STableMetaMsg { ...@@ -760,10 +760,12 @@ typedef struct STableMetaMsg {
} STableMetaMsg; } STableMetaMsg;
typedef struct SMultiTableMeta { typedef struct SMultiTableMeta {
int32_t numOfTables; int32_t numOfTables;
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t contLen; uint32_t contLen:31;
char meta[]; uint8_t compressed:1; // denote if compressed or not
uint32_t rawLen; // size before compress
char meta[];
} SMultiTableMeta; } SMultiTableMeta;
typedef struct { typedef struct {
......
...@@ -27,7 +27,7 @@ typedef struct { ...@@ -27,7 +27,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
} SCqCfg; } SCqCfg;
......
...@@ -49,7 +49,7 @@ typedef struct { ...@@ -49,7 +49,7 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
SRspRet rspRet; SRspRet rspRet;
char reserveForSync[24]; char reserveForSync[24];
SWalHead pHead; SWalHead walHead;
} SVWriteMsg; } SVWriteMsg;
// vnodeStatus // vnodeStatus
......
...@@ -569,7 +569,7 @@ SArguments g_args = { ...@@ -569,7 +569,7 @@ SArguments g_args = {
0, // test_mode 0, // test_mode
"127.0.0.1", // host "127.0.0.1", // host
6030, // port 6030, // port
TAOSC_IFACE, // iface INTERFACE_BUT, // iface
"root", // user "root", // user
#ifdef _TD_POWER_ #ifdef _TD_POWER_
"powerdb", // password "powerdb", // password
...@@ -1429,8 +1429,13 @@ static int printfInsertMeta() { ...@@ -1429,8 +1429,13 @@ static int printfInsertMeta() {
else else
printf("\ntaosdemo is simulating random data as you request..\n\n"); printf("\ntaosdemo is simulating random data as you request..\n\n");
printf("interface: \033[33m%s\033[0m\n", if (g_args.iface != INTERFACE_BUT) {
(g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); // first time if no iface specified
printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":
(g_args.iface==REST_IFACE)?"rest":"stmt");
}
printf("host: \033[33m%s:%u\033[0m\n", printf("host: \033[33m%s:%u\033[0m\n",
g_Dbs.host, g_Dbs.port); g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
...@@ -5038,13 +5043,17 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -5038,13 +5043,17 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
uint16_t iface; uint16_t iface;
if (superTblInfo) if (superTblInfo)
iface = superTblInfo->iface; iface = superTblInfo->iface;
else else {
iface = g_args.iface; if (g_args.iface == INTERFACE_BUT)
iface = TAOSC_IFACE;
else
iface = g_args.iface;
}
debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, __func__, __LINE__,
(g_args.iface==TAOSC_IFACE)? (iface==TAOSC_IFACE)?
"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); "taosc":(iface==REST_IFACE)?"rest":"stmt");
switch(iface) { switch(iface) {
case TAOSC_IFACE: case TAOSC_IFACE:
...@@ -5884,7 +5893,7 @@ static void printStatPerThread(threadInfo *pThreadInfo) ...@@ -5884,7 +5893,7 @@ static void printStatPerThread(threadInfo *pThreadInfo)
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows, pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0))); (pThreadInfo->totalDelay)?(double)((pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay)/1000.0)): FLT_MAX);
} }
// sync write interlace data // sync write interlace data
...@@ -6463,7 +6472,7 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -6463,7 +6472,7 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
} }
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) { char* precision, SSuperTable* superTblInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI; int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) { if (0 != precision[0]) {
...@@ -7936,7 +7945,12 @@ static void setParaFromArg(){ ...@@ -7936,7 +7945,12 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20); g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
if (g_args.iface == INTERFACE_BUT) {
g_Dbs.db[0].superTbls[0].iface = TAOSC_IFACE;
} else {
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
}
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
......
...@@ -104,6 +104,20 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) { ...@@ -104,6 +104,20 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
tfree(pTable); tfree(pTable);
} }
static char* mnodeGetTableShowPattern(SShowObj *pShow) {
char* pattern = NULL;
if (pShow != NULL && pShow->payloadLen > 0) {
pattern = (char*)malloc(pShow->payloadLen + 1);
if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
memcpy(pattern, pShow->payload, pShow->payloadLen);
pattern[pShow->payloadLen] = 0;
}
return pattern;
}
static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) { static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) {
mnodeDestroyChildTable(pRow->pObj); mnodeDestroyChildTable(pRow->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1620,6 +1634,11 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1620,6 +1634,11 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char stableName[TSDB_TABLE_NAME_LEN] = {0}; char stableName[TSDB_TABLE_NAME_LEN] = {0};
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextSuperTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextSuperTable(pShow->pIter, &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -1631,7 +1650,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1631,7 +1650,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
memset(stableName, 0, tListLen(stableName)); memset(stableName, 0, tListLen(stableName));
mnodeExtractTableName(pTable->info.tableId, stableName); mnodeExtractTableName(pTable->info.tableId, stableName);
if (pShow->payloadLen > 0 && patternMatch(pShow->payload, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) { if (pShow->payloadLen > 0 && patternMatch(pattern, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -1671,6 +1690,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1671,6 +1690,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
free(pattern);
return numOfRows; return numOfRows;
} }
...@@ -2892,7 +2912,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray ...@@ -2892,7 +2912,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
(*totalMallocLen) *= 2; (*totalMallocLen) *= 2;
} }
pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen); pMultiMeta = realloc(pMultiMeta, *totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
return NULL; return NULL;
} }
...@@ -2923,8 +2943,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -2923,8 +2943,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
} }
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size // first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); int32_t totalMallocLen = sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
pMultiMeta = rpcMallocCont(totalMallocLen); pMultiMeta = calloc(1, totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end; goto _end;
...@@ -2957,7 +2977,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -2957,7 +2977,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
int remain = totalMallocLen - pMultiMeta->contLen; int remain = totalMallocLen - pMultiMeta->contLen;
if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) { if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
totalMallocLen *= 2; totalMallocLen *= 2;
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen); pMultiMeta = realloc(pMultiMeta, totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
mnodeDecTableRef(pMsg->pTable); mnodeDecTableRef(pMsg->pTable);
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
...@@ -3027,16 +3047,41 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3027,16 +3047,41 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.len = pMultiMeta->contLen; pMsg->rpcRsp.len = pMultiMeta->contLen;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
char* tmp = rpcMallocCont(pMultiMeta->contLen + 2);
if (tmp == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end;
}
int32_t len = tsCompressString(pMultiMeta->meta, (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
tmp + sizeof(SMultiTableMeta), (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta) + 2, ONE_STAGE_COMP, NULL, 0);
pMultiMeta->rawLen = pMultiMeta->contLen;
if (len == -1 || len + sizeof(SMultiTableMeta) >= pMultiMeta->contLen + 2) { // compress failed, do not compress this binary data
pMultiMeta->compressed = 0;
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta) + pMultiMeta->contLen);
} else {
pMultiMeta->compressed = 1;
pMultiMeta->contLen = sizeof(SMultiTableMeta) + len;
// copy the header and the compressed payload
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta));
}
pMsg->rpcRsp.rsp = tmp;
pMsg->rpcRsp.len = pMultiMeta->contLen;
SMultiTableMeta* p = (SMultiTableMeta*) tmp;
mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed);
_end: _end:
tfree(str); tfree(str);
tfree(nameList); tfree(nameList);
taosArrayDestroy(pList); taosArrayDestroy(pList);
pMsg->pTable = NULL; pMsg->pTable = NULL;
pMsg->pVgroup = NULL; pMsg->pVgroup = NULL;
tfree(pMultiMeta);
if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMultiMeta);
}
return code; return code;
} }
...@@ -3132,15 +3177,9 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -3132,15 +3177,9 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
char prefix[64] = {0}; char prefix[64] = {0};
int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64); int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64);
char* pattern = NULL; char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0) { if (pShow->payloadLen > 0 && pattern == NULL) {
pattern = (char*)malloc(pShow->payloadLen + 1); return 0;
if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return 0;
}
memcpy(pattern, pShow->payload, pShow->payloadLen);
pattern[pShow->payloadLen] = 0;
} }
while (numOfRows < rows) { while (numOfRows < rows) {
...@@ -3372,6 +3411,11 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3372,6 +3411,11 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -3387,7 +3431,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3387,7 +3431,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
// pattern compare for table name // pattern compare for table name
mnodeExtractTableName(pTable->info.tableId, tableName); mnodeExtractTableName(pTable->info.tableId, tableName);
if (pShow->payloadLen > 0 && patternMatch(pShow->payload, tableName, sizeof(tableName) - 1, &info) != TSDB_PATTERN_MATCH) { if (pShow->payloadLen > 0 && patternMatch(pattern, tableName, sizeof(tableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -3419,6 +3463,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3419,6 +3463,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
free(pattern);
return numOfRows; return numOfRows;
} }
......
...@@ -35,4 +35,7 @@ void httpTrimTableName(char *name); ...@@ -35,4 +35,7 @@ void httpTrimTableName(char *name);
int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name); int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name);
char * httpGetCmdsString(HttpContext *pContext, int32_t pos); char * httpGetCmdsString(HttpContext *pContext, int32_t pos);
int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql);
void httpCheckFreeEscapedSql(char *oldSql, char *newSql);
#endif #endif
...@@ -176,6 +176,16 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -176,6 +176,16 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
return false; return false;
} }
#define ESCAPE_ERROR_PROC(code, context, root) \
do { \
if (code != TSDB_CODE_SUCCESS) { \
httpSendErrorResp(context, code); \
\
cJSON_Delete(root); \
return false; \
} \
} while (0)
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
cJSON* query = cJSON_GetArrayItem(root, i); cJSON* query = cJSON_GetArrayItem(root, i);
if (query == NULL) continue; if (query == NULL) continue;
...@@ -186,7 +196,14 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -186,7 +196,14 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
continue; continue;
} }
int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, refId->valuestring); char *newStr = NULL;
int32_t retCode = 0;
retCode = httpCheckAllocEscapeSql(refId->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(refId->valuestring, newStr);
if (refIdBuffer == -1) { if (refIdBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user); httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user);
break; break;
...@@ -195,7 +212,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -195,7 +212,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
cJSON* alias = cJSON_GetObjectItem(query, "alias"); cJSON* alias = cJSON_GetObjectItem(query, "alias");
int32_t aliasBuffer = -1; int32_t aliasBuffer = -1;
if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) { if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) {
aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring); retCode = httpCheckAllocEscapeSql(alias->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
aliasBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(alias->valuestring, newStr);
if (aliasBuffer == -1) { if (aliasBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user); httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user);
break; break;
...@@ -211,7 +232,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -211,7 +232,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
continue; continue;
} }
int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, sql->valuestring); retCode = httpCheckAllocEscapeSql(sql->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(sql->valuestring, newStr);
if (sqlBuffer == -1) { if (sqlBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user); httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user);
break; break;
...@@ -237,6 +262,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -237,6 +262,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
} }
} }
#undef ESCAPE_ERROR_PROC
pContext->reqType = HTTP_REQTYPE_MULTI_SQL; pContext->reqType = HTTP_REQTYPE_MULTI_SQL;
pContext->encodeMethod = &gcQueryMethod; pContext->encodeMethod = &gcQueryMethod;
pContext->multiCmds->pos = 0; pContext->multiCmds->pos = 0;
......
...@@ -423,3 +423,65 @@ void httpProcessRequest(HttpContext *pContext) { ...@@ -423,3 +423,65 @@ void httpProcessRequest(HttpContext *pContext) {
httpExecCmd(pContext); httpExecCmd(pContext);
} }
} }
int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql)
{
char *pos;
if (oldSql == NULL || newSql == NULL) {
return TSDB_CODE_SUCCESS;
}
/* bad sql clause */
pos = strstr(oldSql, "%%");
if (pos) {
httpError("bad sql:%s", oldSql);
return TSDB_CODE_HTTP_REQUEST_JSON_ERROR;
}
pos = strchr(oldSql, '%');
if (pos == NULL) {
httpDebug("sql:%s", oldSql);
*newSql = oldSql;
return TSDB_CODE_SUCCESS;
}
*newSql = (char *) calloc(1, (strlen(oldSql) << 1) + 1);
if (newSql == NULL) {
httpError("failed to allocate for new sql, old sql:%s", oldSql);
return TSDB_CODE_HTTP_NO_ENOUGH_MEMORY;
}
char *src = oldSql;
char *dst = *newSql;
size_t sqlLen = strlen(src);
while (1) {
memcpy(dst, src, pos - src + 1);
dst += pos - src + 1;
*dst++ = '%';
if (pos + 1 >= oldSql + sqlLen) {
break;
}
src = ++pos;
pos = strchr(pos, '%');
if (pos == NULL) {
memcpy(dst, src, strlen(src));
break;
}
}
return TSDB_CODE_SUCCESS;
}
void httpCheckFreeEscapedSql(char *oldSql, char *newSql)
{
if (oldSql && newSql) {
if (oldSql != newSql) {
free(newSql);
}
}
}
...@@ -610,7 +610,18 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -610,7 +610,18 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
// stable tag for detail // stable tag for detail
for (int32_t i = 0; i < orderTagsLen; ++i) { for (int32_t i = 0; i < orderTagsLen; ++i) {
cJSON *tag = orderedTags[i]; cJSON *tag = orderedTags[i];
stable_cmd->tagNames[i] = table_cmd->tagNames[i] = httpAddToSqlCmdBuffer(pContext, tag->string);
char *tagStr = NULL;
int32_t retCode = httpCheckAllocEscapeSql(tag->string, &tagStr);
if (retCode != TSDB_CODE_SUCCESS) {
httpSendErrorResp(pContext, retCode);
return false;
}
stable_cmd->tagNames[i] = table_cmd->tagNames[i] = httpAddToSqlCmdBuffer(pContext, tagStr);
httpCheckFreeEscapedSql(tag->string, tagStr);
if (tag->type == cJSON_String) if (tag->type == cJSON_String)
stable_cmd->tagValues[i] = table_cmd->tagValues[i] = httpAddToSqlCmdBuffer(pContext, "'%s'", tag->valuestring); stable_cmd->tagValues[i] = table_cmd->tagValues[i] = httpAddToSqlCmdBuffer(pContext, "'%s'", tag->valuestring);
......
...@@ -589,7 +589,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu ...@@ -589,7 +589,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId); SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t qId);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen, void* merger); int32_t prevResultLen, void* merger);
......
...@@ -106,11 +106,14 @@ typedef struct SQueryInfo { ...@@ -106,11 +106,14 @@ typedef struct SQueryInfo {
STagCond tagCond; STagCond tagCond;
SOrderVal order; SOrderVal order;
int16_t fillType; // final result fill type
int16_t numOfTables; int16_t numOfTables;
STableMetaInfo **pTableMetaInfo; STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf; struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
......
...@@ -58,7 +58,7 @@ ...@@ -58,7 +58,7 @@
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { \ if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { \
__ctx->tag.i64 = (ts); \ __ctx->tag.i64 = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \ } \
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \
...@@ -520,7 +520,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -520,7 +520,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \ if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \
continue; \ continue; \
} \ } \
TSKEY key = GET_TS_DATA(ctx, i); \ TSKEY key = (ctx)->ptsList != NULL? GET_TS_DATA(ctx, i):0; \
UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \ UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \
} }
...@@ -1463,10 +1463,11 @@ static void first_function(SQLFunctionCtx *pCtx) { ...@@ -1463,10 +1463,11 @@ static void first_function(SQLFunctionCtx *pCtx) {
} }
memcpy(pCtx->pOutput, data, pCtx->inputBytes); memcpy(pCtx->pOutput, data, pCtx->inputBytes);
if (pCtx->ptsList != NULL) {
TSKEY k = GET_TS_DATA(pCtx, i); TSKEY k = GET_TS_DATA(pCtx, i);
DO_UPDATE_TAG_COLUMNS(pCtx, k); DO_UPDATE_TAG_COLUMNS(pCtx, k);
}
SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG; pInfo->hasResult = DATA_SET_FLAG;
pInfo->complete = true; pInfo->complete = true;
......
...@@ -951,7 +951,9 @@ static TSKEY getStartTsKey(SQueryAttr* pQueryAttr, STimeWindow* win, const TSKEY ...@@ -951,7 +951,9 @@ static TSKEY getStartTsKey(SQueryAttr* pQueryAttr, STimeWindow* win, const TSKEY
static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) { static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) {
sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols; sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols;
sas->pExprInfo = pExprInfo; sas->pExprInfo = pExprInfo;
if (sas->colList != NULL) {
return;
}
sas->colList = calloc(1, pSDataBlock->info.numOfCols*sizeof(SColumnInfo)); sas->colList = calloc(1, pSDataBlock->info.numOfCols*sizeof(SColumnInfo));
for(int32_t i = 0; i < sas->numOfCols; ++i) { for(int32_t i = 0; i < sas->numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, i);
...@@ -2268,10 +2270,11 @@ static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) { ...@@ -2268,10 +2270,11 @@ static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
return status; return status;
} }
static void doExchangeTimeWindow(SQInfo* pQInfo, STimeWindow* win) { static void doUpdateLastKey(SQueryAttr* pQueryAttr) {
SQueryAttr* pQueryAttr = &pQInfo->query; STimeWindow* win = &pQueryAttr->window;
size_t t = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList);
for(int32_t i = 0; i < t; ++i) { size_t num = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList);
for(int32_t i = 0; i < num; ++i) {
SArray* p1 = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); SArray* p1 = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
size_t len = taosArrayGetSize(p1); size_t len = taosArrayGetSize(p1);
...@@ -2286,7 +2289,7 @@ static void doExchangeTimeWindow(SQInfo* pQInfo, STimeWindow* win) { ...@@ -2286,7 +2289,7 @@ static void doExchangeTimeWindow(SQInfo* pQInfo, STimeWindow* win) {
} }
} }
static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) { static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) {
SQueryAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; SQueryAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
// in case of point-interpolation query, use asc order scan // in case of point-interpolation query, use asc order scan
...@@ -2303,6 +2306,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2303,6 +2306,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
} }
pQueryAttr->needReverseScan = false;
return; return;
} }
...@@ -2312,7 +2316,8 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2312,7 +2316,8 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
} }
doExchangeTimeWindow(pQInfo, &pQueryAttr->window); pQueryAttr->needReverseScan = false;
doUpdateLastKey(pQueryAttr);
return; return;
} }
...@@ -2333,20 +2338,22 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2333,20 +2338,22 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo, &pQueryAttr->window); doUpdateLastKey(pQueryAttr);
} }
pQueryAttr->order.order = TSDB_ORDER_ASC; pQueryAttr->order.order = TSDB_ORDER_ASC;
pQueryAttr->needReverseScan = false;
} else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) { } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo, &pQueryAttr->window); doUpdateLastKey(pQueryAttr);
} }
pQueryAttr->order.order = TSDB_ORDER_DESC; pQueryAttr->order.order = TSDB_ORDER_DESC;
pQueryAttr->needReverseScan = false;
} }
} else { // interval query } else { // interval query
...@@ -2357,20 +2364,22 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2357,20 +2364,22 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo, &pQueryAttr->window); doUpdateLastKey(pQueryAttr);
} }
pQueryAttr->order.order = TSDB_ORDER_ASC; pQueryAttr->order.order = TSDB_ORDER_ASC;
pQueryAttr->needReverseScan = false;
} else if (onlyLastQuery(pQueryAttr)) { } else if (onlyLastQuery(pQueryAttr)) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC, qDebug(msg, pQInfo, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
doExchangeTimeWindow(pQInfo, &pQueryAttr->window); doUpdateLastKey(pQueryAttr);
} }
pQueryAttr->order.order = TSDB_ORDER_DESC; pQueryAttr->order.order = TSDB_ORDER_DESC;
pQueryAttr->needReverseScan = false;
} }
} }
} }
...@@ -2388,9 +2397,6 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i ...@@ -2388,9 +2397,6 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) { while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
*ps = ((*ps) << 1u); *ps = ((*ps) << 1u);
} }
// pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize);
// assert(pRuntimeEnv->numOfRowsPerPage <= MAX_ROWS_PER_RESBUF_PAGE);
} }
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
...@@ -4382,7 +4388,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4382,7 +4388,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
break; break;
} }
case OP_DataBlocksOptScan: { case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), 1); pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
break; break;
} }
case OP_TableScan: { case OP_TableScan: {
...@@ -4420,8 +4426,10 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4420,8 +4426,10 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
if (pQInfo->summary.queryProfEvents == NULL) { if (pQInfo->summary.queryProfEvents == NULL) {
qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId); qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId);
} }
pQInfo->summary.operatorProfResults = pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK); taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
if (pQInfo->summary.operatorProfResults == NULL) { if (pQInfo->summary.operatorProfResults == NULL) {
qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId); qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId);
} }
...@@ -4814,7 +4822,6 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -4814,7 +4822,6 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->current = 0; pInfo->current = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
// pInfo->prevGroupId = -1;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->name = "DataBlocksOptimizedScanOperator";
...@@ -5979,8 +5986,13 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 ...@@ -5979,8 +5986,13 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
pCols[i].colId = pExpr[i].base.resColId; pCols[i].colId = pExpr[i].base.resColId;
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); if (pCols[i].flist.numOfFilters != 0) {
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
} else {
// avoid runtime error
pCols[i].flist.filterInfo = NULL;
}
} }
assert(numOfFilter > 0); assert(numOfFilter > 0);
...@@ -6439,10 +6451,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -6439,10 +6451,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
if (isNull(val, type)) { if (isNull(val, type)) {
continue; continue;
} }
int dummy;
void* res = taosHashGet(pInfo->pSet, val, bytes); void* res = taosHashGet(pInfo->pSet, val, bytes);
if (res == NULL) { if (res == NULL) {
taosHashPut(pInfo->pSet, val, bytes, NULL, 0); taosHashPut(pInfo->pSet, val, bytes, &dummy, sizeof(dummy));
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
memcpy(start, val, bytes); memcpy(start, val, bytes);
pRes->info.rows += 1; pRes->info.rows += 1;
...@@ -7366,7 +7378,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { ...@@ -7366,7 +7378,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId,
char* sql, uint64_t *qId) { char* sql, uint64_t qId) {
int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput; int16_t numOfOutput = pQueryMsg->numOfOutput;
...@@ -7375,7 +7387,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -7375,7 +7387,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
goto _cleanup_qinfo; goto _cleanup_qinfo;
} }
pQInfo->qId = *qId; pQInfo->qId = qId;
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo; pQInfo->signature = pQInfo;
...@@ -7485,7 +7497,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -7485,7 +7497,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
tsem_init(&pQInfo->ready, 0, 0); tsem_init(&pQInfo->ready, 0, 0);
pQueryAttr->window = pQueryMsg->window; pQueryAttr->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery); updateDataCheckOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STimeWindow window = pQueryAttr->window; STimeWindow window = pQueryAttr->window;
......
...@@ -162,7 +162,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -162,7 +162,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(pQueryMsg->stableQuery == isSTableQuery); assert(pQueryMsg->stableQuery == isSTableQuery);
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, vgId, param.sql, qId); param.pTagColumnInfo, vgId, param.sql, *qId);
param.sql = NULL; param.sql = NULL;
param.pExprs = NULL; param.pExprs = NULL;
......
...@@ -98,6 +98,8 @@ typedef struct SIOCostSummary { ...@@ -98,6 +98,8 @@ typedef struct SIOCostSummary {
int64_t blockLoadTime; int64_t blockLoadTime;
int64_t statisInfoLoadTime; int64_t statisInfoLoadTime;
int64_t checkForNextTime; int64_t checkForNextTime;
int64_t headFileLoad;
int64_t headFileLoadTime;
} SIOCostSummary; } SIOCostSummary;
typedef struct STsdbQueryHandle { typedef struct STsdbQueryHandle {
...@@ -1045,15 +1047,21 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -1045,15 +1047,21 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*numOfBlocks = 0; *numOfBlocks = 0;
pQueryHandle->cost.headFileLoad += 1;
int64_t s = taosGetTimestampUs();
size_t numOfTables = 0; size_t numOfTables = 0;
if (pQueryHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { if (pQueryHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
code = loadBlockInfo(pQueryHandle, pQueryHandle->activeIndex, numOfBlocks); code = loadBlockInfo(pQueryHandle, pQueryHandle->activeIndex, numOfBlocks);
} else if (pQueryHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) { } else if (pQueryHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
code = loadBlockInfo(pQueryHandle, i, numOfBlocks); code = loadBlockInfo(pQueryHandle, i, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
int64_t e = taosGetTimestampUs();
pQueryHandle->cost.headFileLoadTime += (e - s);
return code; return code;
} }
} }
...@@ -1061,6 +1069,8 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -1061,6 +1069,8 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
assert(0); assert(0);
} }
int64_t e = taosGetTimestampUs();
pQueryHandle->cost.headFileLoadTime += (e - s);
return code; return code;
} }
...@@ -3814,8 +3824,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -3814,8 +3824,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
SIOCostSummary* pCost = &pQueryHandle->cost; SIOCostSummary* pCost = &pQueryHandle->cost;
tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, 0x%"PRIx64, tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, 0x%"PRIx64,
pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qId); pQueryHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qId);
tfree(pQueryHandle); tfree(pQueryHandle);
} }
......
...@@ -403,6 +403,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, "tag value can not mor ...@@ -403,6 +403,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, "tag value can not mor
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, "value not find") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, "value not find")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, "value type should be boolean, number or string") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, "value type should be boolean, number or string")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_REQUEST_JSON_ERROR, "http request json error")
// odbc // odbc
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, "out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, "convertion not a valid literal input") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, "convertion not a valid literal input")
......
...@@ -173,10 +173,12 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) { ...@@ -173,10 +173,12 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQall *qall = (STaosQall *)p2; STaosQall *qall = (STaosQall *)p2;
int code = 0; int code = 0;
bool empty;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { empty = queue->head == NULL;
if (!empty) {
memset(qall, 0, sizeof(STaosQall)); memset(qall, 0, sizeof(STaosQall));
qall->current = queue->head; qall->current = queue->head;
qall->start = queue->head; qall->start = queue->head;
...@@ -188,11 +190,17 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) { ...@@ -188,11 +190,17 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) {
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
return code; // if source queue is empty, we set destination qall to empty too.
if (empty) {
qall->current = NULL;
qall->start = NULL;
qall->numOfItems = 0;
}
return code;
} }
int taosGetQitem(taos_qall param, int *type, void **pitem) { int taosGetQitem(taos_qall param, int *type, void **pitem) {
...@@ -423,10 +431,22 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { ...@@ -423,10 +431,22 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
int taosGetQueueItemsNumber(taos_queue param) { int taosGetQueueItemsNumber(taos_queue param) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
return queue->numOfItems; if (!queue) return 0;
int num;
pthread_mutex_lock(&queue->mutex);
num = queue->numOfItems;
pthread_mutex_unlock(&queue->mutex);
return num;
} }
int taosGetQsetItemsNumber(taos_qset param) { int taosGetQsetItemsNumber(taos_qset param) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
return qset->numOfItems; if (!qset) return 0;
int num = 0;
pthread_mutex_lock(&qset->mutex);
num = qset->numOfItems;
pthread_mutex_unlock(&qset->mutex);
return num;
} }
...@@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); bool force = (pWrite == NULL ? false : pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force); syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
if (syncCode < 0) { if (syncCode < 0) {
pHead->version = 0; pHead->version = 0;
...@@ -237,7 +237,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32 ...@@ -237,7 +237,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32
return NULL; return NULL;
} }
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SVWriteMsg) + pHead->len;
SVWriteMsg *pWrite = taosAllocateQitem(size); SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) { if (pWrite == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY; terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
...@@ -248,7 +248,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32 ...@@ -248,7 +248,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32
pWrite->rpcMsg = *pRpcMsg; pWrite->rpcMsg = *pRpcMsg;
} }
memcpy(&pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); memcpy(&pWrite->walHead, pHead, sizeof(SWalHead) + pHead->len);
pWrite->pVnode = pVnode; pWrite->pVnode = pVnode;
pWrite->qtype = qtype; pWrite->qtype = qtype;
...@@ -286,7 +286,7 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { ...@@ -286,7 +286,7 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
} }
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len); int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->walHead.len);
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) { if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
...@@ -330,7 +330,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { ...@@ -330,7 +330,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
if (pVnode) { if (pVnode) {
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len); int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->walHead.len);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite, vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
pWrite->rpcMsg.ahandle, queued, queuedSize); pWrite->rpcMsg.ahandle, queued, queuedSize);
......
...@@ -36,9 +36,16 @@ static int32_t walInitObj(SWal *pWal); ...@@ -36,9 +36,16 @@ static int32_t walInitObj(SWal *pWal);
static void walFreeObj(void *pWal); static void walFreeObj(void *pWal);
int32_t walInit() { int32_t walInit() {
int32_t code = 0;
tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
int32_t code = walCreateThread(); code = pthread_mutex_init(&tsWal.mutex, NULL);
if (code) {
wError("failed to init wal mutex since %s", tstrerror(code));
return code;
}
code = walCreateThread();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
wError("failed to init wal module since %s", tstrerror(code)); wError("failed to init wal module since %s", tstrerror(code));
return code; return code;
...@@ -51,6 +58,7 @@ int32_t walInit() { ...@@ -51,6 +58,7 @@ int32_t walInit() {
void walCleanUp() { void walCleanUp() {
walStopThread(); walStopThread();
taosCloseRef(tsWal.refId); taosCloseRef(tsWal.refId);
pthread_mutex_destroy(&tsWal.mutex);
wInfo("wal module is cleaned up"); wInfo("wal module is cleaned up");
} }
...@@ -183,10 +191,15 @@ static void walFsyncAll() { ...@@ -183,10 +191,15 @@ static void walFsyncAll() {
} }
static void *walThreadFunc(void *param) { static void *walThreadFunc(void *param) {
int stop = 0;
while (1) { while (1) {
walUpdateSeq(); walUpdateSeq();
walFsyncAll(); walFsyncAll();
if (tsWal.stop) break;
pthread_mutex_lock(&tsWal.mutex);
stop = tsWal.stop;
pthread_mutex_unlock(&tsWal.mutex);
if (stop) break;
} }
return NULL; return NULL;
...@@ -209,7 +222,10 @@ static int32_t walCreateThread() { ...@@ -209,7 +222,10 @@ static int32_t walCreateThread() {
} }
static void walStopThread() { static void walStopThread() {
pthread_mutex_lock(&tsWal.mutex);
tsWal.stop = 1; tsWal.stop = 1;
pthread_mutex_unlock(&tsWal.mutex);
if (taosCheckPthreadValid(tsWal.thread)) { if (taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL); pthread_join(tsWal.thread, NULL);
} }
......
...@@ -240,6 +240,7 @@ python3 ./test.py -f query/nestedQuery/queryInterval.py ...@@ -240,6 +240,7 @@ python3 ./test.py -f query/nestedQuery/queryInterval.py
python3 ./test.py -f query/queryStateWindow.py python3 ./test.py -f query/queryStateWindow.py
python3 ./test.py -f query/nestedQuery/queryWithOrderLimit.py python3 ./test.py -f query/nestedQuery/queryWithOrderLimit.py
python3 ./test.py -f query/nestquery_last_row.py python3 ./test.py -f query/nestquery_last_row.py
python3 ./test.py -f query/queryCnameDisplay.py
#stream #stream
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import string
import random
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getLongName(self, len, mode = "mixed"):
"""
generate long str
"""
chars = ''.join(random.choice(string.ascii_letters.lower()) for i in range(len))
return chars
def checkRegularTableCname(self):
"""
check regular table cname
"""
# len(colName) <=64, generate cname list and make first param = 63 and second param = 65
cname_list = []
for i in range(10):
cname_list.append(self.getLongName(64))
cname_list[0] = self.getLongName(63)
cname_list[1] = self.getLongName(65)
# create table and insert data
tdSql.execute("CREATE TABLE regular_table_cname_check (ts timestamp, pi1 int, pi2 bigint, pf1 float, pf2 double, ps1 binary(10), pi3 smallint, pi4 tinyint, pb1 bool, ps2 nchar(20))")
tdSql.execute('insert into regular_table_cname_check values (now, 1, 2, 1.1, 2.2, "a", 1, 1, true, "aa");')
tdSql.execute('insert into regular_table_cname_check values (now, 2, 3, 1.2, 2.3, "b", 2, 1, false, "aa");')
tdSql.execute('insert into regular_table_cname_check values (now, 3, 4, 1.3, 2.4, "c", 1, 3, true, "bb");')
# select as cname with cname_list
sql_seq = f'select count(ts) as {cname_list[0]}, sum(pi1) as {cname_list[1]}, avg(pi2) as {cname_list[2]}, count(pf1) as {cname_list[3]}, count(pf2) as {cname_list[4]}, count(ps1) as {cname_list[5]}, min(pi3) as {cname_list[6]}, max(pi4) as {cname_list[7]}, count(pb1) as {cname_list[8]}, count(ps2) as {cname_list[9]} from regular_table_cname_check'
sql_seq_no_as = sql_seq.replace('as ', '')
res = tdSql.getColNameList(sql_seq)
res_no_as = tdSql.getColNameList(sql_seq_no_as)
# cname[1] > 64, it is expected to be equal to 64
cname_list_1_expected = cname_list[1][:-1]
cname_list[1] = cname_list_1_expected
checkColNameList = tdSql.checkColNameList(res, cname_list)
checkColNameList = tdSql.checkColNameList(res_no_as, cname_list)
def checkSuperTableCname(self):
"""
check super table cname
"""
# len(colName) <=64, generate cname list and make first param = 63 and second param = 65
cname_list = []
for i in range(19):
cname_list.append(self.getLongName(64))
cname_list[0] = self.getLongName(63)
cname_list[1] = self.getLongName(65)
# create table and insert data
tdSql.execute("create table super_table_cname_check (ts timestamp, pi1 int, pi2 bigint, pf1 float, pf2 double, ps1 binary(10), pi3 smallint, pi4 tinyint, pb1 bool, ps2 nchar(20)) tags (si1 int, si2 bigint, sf1 float, sf2 double, ss1 binary(10), si3 smallint, si4 tinyint, sb1 bool, ss2 nchar(20));")
tdSql.execute('create table st1 using super_table_cname_check tags (1, 2, 1.1, 2.2, "a", 1, 1, true, "aa");')
tdSql.execute('insert into st1 values (now, 1, 2, 1.1, 2.2, "a", 1, 1, true, "aa");')
tdSql.execute('insert into st1 values (now, 1, 1, 1.4, 2.3, "b", 3, 2, true, "aa");')
tdSql.execute('insert into st1 values (now, 1, 2, 1.1, 2.2, "a", 1, 1, false, "bb");')
# select as cname with cname_list
sql_seq = f'select count(ts) as {cname_list[0]}, sum(pi1) as {cname_list[1]}, avg(pi2) as {cname_list[2]}, count(pf1) as {cname_list[3]}, count(pf2) as {cname_list[4]}, count(ps1) as {cname_list[5]}, min(pi3) as {cname_list[6]}, max(pi4) as {cname_list[7]}, count(pb1) as {cname_list[8]}, count(ps2) as {cname_list[9]}, count(si1) as {cname_list[10]}, count(si2) as {cname_list[11]}, count(sf1) as {cname_list[12]}, count(sf2) as {cname_list[13]}, count(ss1) as {cname_list[14]}, count(si3) as {cname_list[15]}, count(si4) as {cname_list[16]}, count(sb1) as {cname_list[17]}, count(ss2) as {cname_list[18]} from super_table_cname_check'
sql_seq_no_as = sql_seq.replace('as ', '')
res = tdSql.getColNameList(sql_seq)
res_no_as = tdSql.getColNameList(sql_seq_no_as)
# cname[1] > 64, it is expected to be equal to 64
cname_list_1_expected = cname_list[1][:-1]
cname_list[1] = cname_list_1_expected
checkColNameList = tdSql.checkColNameList(res, cname_list)
checkColNameList = tdSql.checkColNameList(res_no_as, cname_list)
def run(self):
tdSql.prepare()
self.checkRegularTableCname()
self.checkSuperTableCname()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -79,6 +79,21 @@ class TDSql: ...@@ -79,6 +79,21 @@ class TDSql:
raise Exception(repr(e)) raise Exception(repr(e))
return self.queryRows return self.queryRows
def getColNameList(self, sql):
self.sql = sql
try:
col_name_list = []
self.cursor.execute(sql)
self.queryCols = self.cursor.description
for query_col in self.queryCols:
col_name_list.append(query_col[0])
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return col_name_list
def waitedQuery(self, sql, expectRows, timeout): def waitedQuery(self, sql, expectRows, timeout):
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout)) tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
self.sql = sql self.sql = sql
...@@ -209,6 +224,14 @@ class TDSql: ...@@ -209,6 +224,14 @@ class TDSql:
tdLog.info("sql:%s, affectedRows:%d == expect:%d" % (self.sql, self.affectedRows, expectAffectedRows)) tdLog.info("sql:%s, affectedRows:%d == expect:%d" % (self.sql, self.affectedRows, expectAffectedRows))
def checkColNameList(self, col_name_list, expect_col_name_list):
if col_name_list == expect_col_name_list:
tdLog.info("sql:%s, col_name_list:%s == expect_col_name_list:%s" % (self.sql, col_name_list, expect_col_name_list))
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, col_name_list, expect_col_name_list)
tdLog.exit("%s(%d) failed: sql:%s, col_name_list:%s != expect_col_name_list:%s" % args)
def taosdStatus(self, state): def taosdStatus(self, state):
tdLog.sleep(5) tdLog.sleep(5)
pstate = 0 pstate = 0
......
#!/bin/bash
stty erase '^H'
stty erase '^?'
# 运行前需要安装expect; apt install expect
# 运行方式:
# ./cluster.sh -c xxx.cfg
# cfg文件内格式: 每行代表一个节点 第一列为external ip、第二列为密码、第三列为用户名、第四列为hostname、第五列为interal ip
# 注意:列与列直接用空格隔开
# 例子:
# 51.143.97.155 tbase125! root node5 10.2.0.10
# 20.94.253.116 tbase125! root node2 10.2.0.12
# 20.94.250.236 tbase125! root node3 10.2.0.13
# 20.98.72.51 tbase125! root node4 10.2.0.14
menu(){
echo "=============================="
echo "-------------Target-----------"
echo "=============================="
echo "1 cluster"
echo "=============================="
echo "2 dnode"
echo "=============================="
echo "3 arbitrator"
echo "=============================="
echo "4 exit"
echo "=============================="
}
cluster_menu(){
echo "=============================="
echo "----------Operation-----------"
echo "=============================="
echo "1 start cluster"
echo "=============================="
echo "2 stop cluster"
echo "=============================="
echo "3 exit"
echo "=============================="
}
dnode_menu(){
echo "=============================="
echo "----------Operation-----------"
echo "=============================="
echo "1 start dnode"
echo "=============================="
echo "2 stop dnode"
echo "=============================="
echo "3 add dnode"
echo "=============================="
echo "4 drop dnode"
echo "=============================="
echo "5 exit"
echo "=============================="
}
arbitrator_menu(){
echo "=============================="
echo "----------Operation-----------"
echo "=============================="
echo "1 start arbitrator"
echo "=============================="
echo "2 stop arbitrator"
echo "=============================="
echo "3 exit"
echo "=============================="
}
print_cfg() {
echo "=============================="
echo "-------Configure file---------"
echo "=============================="
echo "Id | IP address | hostname"
i=1
while read line || [[ -n ${line} ]]
do
arr=($line)
echo " $i | ${arr[0]} | ${arr[3]}"
i=`expr $i + 1`;
done < $1
echo "=============================="
}
update(){
expect -c "
set timeout -1;
spawn ssh $3@$1;
expect {
*yes/no* { send \"yes\r\"; exp_continue }
*assword:* { send \"$2\r\" }
}
expect {
*#* { send \"systemctl $4 taosd\r\" }
}
expect {
*#* { send \"exit\r\" }
}
expect eof;
"
echo -e "\033[32mdnode successfully $4 \033[0m"
}
update_dnode(){
i=1
while read line || [[ -n ${line} ]]
do
if [[ $1 -eq $i ]]; then
arr=($line)
update ${arr[0]} ${arr[1]} ${arr[2]} $2
break;
fi
i=`expr $i + 1`;
done < $3
}
add_hosts() {
expect -c "
set timeout -1;
spawn ssh $3@$1;
expect {
*yes/no* { send \"yes\r\"; exp_continue }
*assword:* { send \"$2\r\" }
}
expect {
*#* { send \"echo $4 $5 >> /etc/hosts\r\" }
}
expect {
*#* { send \"exit\r\" }
}
expect eof;
"
echo -e "\033[32mSuccessfully add to /etc/hosts in $1\033[0m"
}
remove_hosts() {
expect -c "
set timeout -1;
spawn ssh $3@$1;
expect {
*yes/no* { send \"yes\r\"; exp_continue }
*assword:* { send \"$2\r\" }
}
expect {
*#* { send \"sed -i '/$4/d\' /etc/hosts\r\" }
}
expect {
*#* { send \"exit\r\" }
}
expect eof;
"
echo -e "\033[32mSuccessfully remove from /etc/hosts in $1\033[0m"
}
remove_varlibtaos() {
expect -c "
set timeout -1;
spawn ssh $3@$1;
expect {
*yes/no* { send \"yes\r\"; exp_continue }
*assword:* { send \"$2\r\" }
}
expect {
*#* { send \"rm -rf /var/lib/taos/*\r\" }
}
expect {
*#* { send \"exit\r\" }
}
expect eof;
"
echo -e "\033[32mSuccessfully remove /var/lib/taos/* in $1\033[0m"
}
scp_cfg() {
expect -c "
set timeout -1;
spawn scp /etc/taos/taos.cfg $3@$1:/etc/taos;
expect {
*yes/no* { send \"yes\r\"; exp_continue }
*assword:* { send \"$2\r\" }
}
expect eof;
"
echo -e "\033[32mSuccessfully scp /etc/taos/taos.cfg to $1\033[0m"
}
manage_dnode(){
i=1
while read line || [[ -n ${line} ]]
do
if [[ $1 -eq $i ]]; then
arr=($line)
scp_cfg ${arr[0]} ${arr[1]} ${arr[2]}
ip=${arr[0]}
pd=${arr[1]}
user=${arr[2]}
j=1
while read line2 || [[ -n ${line2} ]]
do
arr2=($line2)
if [[ $1 -ne $j ]]; then
if [ $3 == "create" ];then
echo "$3"
add_hosts $ip $pd $user ${arr2[4]} ${arr2[3]}
else
remove_hosts $ip $pd $user ${arr2[4]} ${arr2[3]}
fi
fi
j=`expr $j + 1`;
done < $2
remove_varlibtaos $ip $pd $user
if [ $3 == "create" ];then
update $ip $pd $user "start"
else
update $ip $pd $user "stop"
fi
taos -s "$3 dnode \"${arr[3]}:6030\""
break;
fi
i=`expr $i + 1`;
done < $2
echo -e "\033[32mSuccessfully $3 dnode id $1\033[0m"
}
update_cluster() {
while read line || [[ -n ${line} ]]
do
arr=($line)
if [ $1 == "start" ]; then
scp_cfg ${arr[0]} ${arr[1]} ${arr[2]}
fi
update ${arr[0]} ${arr[1]} ${arr[2]} $1
done < $2
}
while :
do
clear
menu
read -p "select mode: " n
case $n in
1)
clear
print_cfg $2
cluster_menu
read -p "select operation: " c
case $c in
1)
update_cluster "start" $2
break
;;
2)
update_cluster "stop" $2
break
;;
3)
break
;;
esac
;;
2)
clear
print_cfg $2
dnode_menu
read -p "select operation: " d
case $d in
1)
clear
print_cfg $2
read -p "select dnode: " id
update_dnode $id "start" $2
break
;;
2)
clear
print_cfg $2
read -p "select dnode: " id
update_dnode $id "stop" $2
break
;;
3)
clear
print_cfg $2
read -p "select dnode: " id
manage_dnode $id $2 "create"
break
;;
4)
clear
print_cfg $2
read -p "select dnode: " id
manage_dnode $id $2 "drop"
break
;;
5)
break
;;
esac
;;
3)
clear
arbitrator_menu
read -p "select operation: " m
case $m in
1)
nohup /usr/local/taos/bin/tarbitrator >/dev/null 2>&1 &
echo -e "\033[32mSuccessfully start arbitrator $3 \033[0m"
break
;;
2)
var=`ps -ef | grep tarbitrator | awk '{print $2}' | head -n 1`
kill -9 $var
break
;;
3)
break
;;
esac
;;
4)
break
;;
esac
done
...@@ -247,4 +247,25 @@ if $system_content != @[{"refId":"A","target":"{val1:nil, val2:nil}","datapoints ...@@ -247,4 +247,25 @@ if $system_content != @[{"refId":"A","target":"{val1:nil, val2:nil}","datapoints
return -1 return -1
endi endi
sql create table tt (ts timestamp ,i int) tags(j binary(20),k binary(20));
sql insert into t1 using tt tags('jnetworki','t1') values('2020-01-01 00:00:00.000',1)('2020-01-01 00:01:00.000',2)('2020-01-01 00:02:00.000',3)('2020-01-01 00:03:00.000',4)('2020-01-01 00:04:00.000',5);
system_content curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d '[ {"refId":"A","alias":"","sql":"select max(i) from db.tt where j like \u0027%network%\u0027 and ts >= \u00272020-01-01 00:00:00.000\u0027 and ts < \u00272020-01-01 00:05:00.000\u0027 interval(5m) group by k "} ]' 127.0.0.1:7111/grafana/query
print step1-> $system_content
if $system_content != @[{"refId":"A","target":"{k:t1}","datapoints":[[5,1577808000000]]}]@ then
return -1
endi
system_content curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d '[ {"refId":"A","alias":"","sql":"select max(i) from db.tt where j like \u0027jnetwo%\u0027 and ts >= \u00272020-01-01 00:00:00.000\u0027 and ts < \u00272020-01-01 00:05:00.000\u0027 interval(5m) group by k "} ]' 127.0.0.1:7111/grafana/query
print step1-> $system_content
if $system_content != @[{"refId":"A","target":"{k:t1}","datapoints":[[5,1577808000000]]}]@ then
return -1
endi
system_content curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d '[ {"refId":"A","alias":"","sql":"select max(i) from db.tt where j like \u0027%networki\u0027 and ts >= \u00272020-01-01 00:00:00.000\u0027 and ts < \u00272020-01-01 00:05:00.000\u0027 interval(5m) group by k "} ]' 127.0.0.1:7111/grafana/query
print step1-> $system_content
if $system_content != @[{"refId":"A","target":"{k:t1}","datapoints":[[5,1577808000000]]}]@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
...@@ -193,7 +193,7 @@ if $data02 != 0.000000000 then ...@@ -193,7 +193,7 @@ if $data02 != 0.000000000 then
return -1 return -1
endi endi
if $data03 != 0.000000000 then if $data03 != NULL then
return -1 return -1
endi endi
...@@ -444,7 +444,7 @@ if $data02 != 8.077777778 then ...@@ -444,7 +444,7 @@ if $data02 != 8.077777778 then
return -1 return -1
endi endi
if $data03 != inf then if $data03 != NULL then
return -1 return -1
endi endi
......
...@@ -334,10 +334,6 @@ sql select top(x, 20) from (select c1 x from nest_tb0); ...@@ -334,10 +334,6 @@ sql select top(x, 20) from (select c1 x from nest_tb0);
sql select bottom(x, 20) from (select c1 x from nest_tb0) sql select bottom(x, 20) from (select c1 x from nest_tb0)
print ===================> complex query
print ===================> group by + having print ===================> group by + having
...@@ -464,6 +460,28 @@ if $data01 != 0.000083333 then ...@@ -464,6 +460,28 @@ if $data01 != 0.000083333 then
return -1 return -1
endi endi
print ======================>TD-5271
sql select min(val),max(val),first(val),last(val),count(val),sum(val),avg(val) from (select count(*) val from nest_mt0 group by tbname)
if $rows != 1 then
return -1
endi
if $data00 != 10000 then
return -1
endi
if $data01 != 10000 then
return -1
endi
if $data04 != 10 then
return -1
endi
if $data05 != 100000 then
return -1
endi
print =================>us database interval query, TD-5039 print =================>us database interval query, TD-5039
sql create database test precision 'us'; sql create database test precision 'us';
sql use test; sql use test;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册