diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 9d61b0df68f3480834fddacd5d701ee0559352e8..fa289c277cf273396375690c8a496fd4eacd77fe 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -100,6 +100,9 @@ # default system charset # charset UTF-8 +# system time zone +# timezone Asia/Shanghai (CST, +0800) + # enable/disable commit log # clog 1 diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 6666fa24a605c186678b2642b9735432b75a8df4..da4ebc4e9cadbeda7755ad35d57889035ffcd0b2 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -107,14 +107,6 @@ void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex); -//TODO refactor, remove -void SStringFree(SString* str); -void SStringCopy(SString* pDest, const SString* pSrc); -SString SStringCreate(const char* str); - -int32_t SStringAlloc(SString* pStr, int32_t size); -int32_t SStringEnsureRemain(SString* pStr, int32_t size); - int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex); void tscClearInterpInfo(SSqlCmd* pCmd); @@ -226,7 +218,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIn void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex); -int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid); +int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid); TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int), void* param, void** taos); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6226cf1f1d836c8ba42d131b42c03a644ba0ce74..d439ba9929ac7c58eb08d946cc7289fd1d8dbee7 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -188,7 +188,7 @@ typedef struct SString { typedef struct SCond { uint64_t uid; - SString cond; + char* cond; } SCond; typedef struct SJoinNode { diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d11a279247ff170ba4b931b11a93589e3e2b3693..abf91e7c4339bdebe7c72f14551da0e1d53132c5 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -51,7 +51,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, } int32_t sqlLen = strlen(sqlstr); - if (sqlLen > TSDB_MAX_SQL_LEN) { + if (sqlLen > tsMaxSQLStringLen) { tscError("sql string too long"); tscQueueAsyncError(fp, param); return; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 17ea5cf8862f3fdb42f95cc80b84acf9394d26ae..ed44d540667284e6af66e184152d0b2fe0a27dc3 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -307,7 +307,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); assert(pNew->cmd.tagCond.joinInfo.hasJoin); - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd, pMeterMetaInfo->pMeterMeta->uid); + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid); pExpr->param[0].i64Key = tagColIndex; pExpr->numOfParams = 1; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ba465c28b963155ef83b9d9617267fe0c0b26beb..d0aa290d3183dd549f3065863037e6d80db0d4ae 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -21,6 +21,7 @@ #include "taosmsg.h" #include "tstoken.h" #include "ttime.h" +#include "tstrbuild.h" #include "tscUtil.h" #include "tschemautil.h" @@ -3103,26 +3104,23 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString) { return TSDB_CODE_SUCCESS; } -static int32_t tablenameListToString(tSQLExpr* pExpr, char* str) { +static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/SStringBuilder* sb) { tSQLExprList* pList = pExpr->pParam; if (pList->nExpr <= 0) { return TSDB_CODE_INVALID_SQL; } if (pList->nExpr > 0) { - strcpy(str, QUERY_COND_REL_PREFIX_IN); - str += QUERY_COND_REL_PREFIX_IN_LEN; + taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN); } int32_t len = 0; for (int32_t i = 0; i < pList->nExpr; ++i) { tSQLExpr* pSub = pList->a[i].pNode; - strncpy(str + len, pSub->val.pz, pSub->val.nLen); - - len += pSub->val.nLen; + taosStringBuilderAppendStringLen(sb, pSub->val.pz, pSub->val.nLen); if (i < pList->nExpr - 1) { - str[len++] = TBNAME_LIST_SEP[0]; + taosStringBuilderAppendString(sb, TBNAME_LIST_SEP); } if (pSub->val.nLen <= 0 || pSub->val.nLen > TSDB_METER_NAME_LEN) { @@ -3133,11 +3131,9 @@ static int32_t tablenameListToString(tSQLExpr* pExpr, char* str) { return TSDB_CODE_SUCCESS; } -static int32_t tablenameCondToString(tSQLExpr* pExpr, char* str) { - strcpy(str, QUERY_COND_REL_PREFIX_LIKE); - str += strlen(QUERY_COND_REL_PREFIX_LIKE); - - strcpy(str, pExpr->val.pz); +static int32_t tablenameCondToString(tSQLExpr* pExpr, /*char* str*/SStringBuilder* sb) { + taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN); + taosStringBuilderAppendString(sb, pExpr->val.pz); return TSDB_CODE_SUCCESS; } @@ -3241,7 +3237,7 @@ static int32_t getTagCondString(SSqlCmd* pCmd, tSQLExpr* pExpr, char** str) { return tSQLExprLeafToString(pExpr, true, str); } -static int32_t getTablenameCond(SSqlCmd* pCmd, tSQLExpr* pTableCond, char* str) { +static int32_t getTablenameCond(SSqlCmd* pCmd, tSQLExpr* pTableCond, /*char* str*/SStringBuilder* sb) { const char* msg0 = "invalid table name list"; if (pTableCond == NULL) { @@ -3258,9 +3254,9 @@ static int32_t getTablenameCond(SSqlCmd* pCmd, tSQLExpr* pTableCond, char* str) int32_t ret = TSDB_CODE_SUCCESS; if (pTableCond->nSQLOptr == TK_IN) { - ret = tablenameListToString(pRight, str); + ret = tablenameListToString(pRight, sb); } else if (pTableCond->nSQLOptr == TK_LIKE) { - ret = tablenameCondToString(pRight, str); + ret = tablenameCondToString(pRight, sb); } if (ret != TSDB_CODE_SUCCESS) { @@ -3828,8 +3824,7 @@ int tableNameCompar(const void* lhs, const void* rhs) { return ret > 0 ? 1 : -1; } -static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_t tableCondIndex, - char* tmpTableCondBuf) { +static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_t tableCondIndex, SStringBuilder* sb) { SSqlCmd* pCmd = &pSql->cmd; const char* msg = "meter name too long"; @@ -3842,26 +3837,25 @@ static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_ STagCond* pTagCond = &pSql->cmd.tagCond; pTagCond->tbnameCond.uid = pMeterMetaInfo->pMeterMeta->uid; - SString* pTableCond = &pCmd->tagCond.tbnameCond.cond; - SStringAlloc(pTableCond, 4096); - assert(pExpr->nSQLOptr == TK_LIKE || pExpr->nSQLOptr == TK_IN); if (pExpr->nSQLOptr == TK_LIKE) { - strcpy(pTableCond->z, tmpTableCondBuf); - pTableCond->n = strlen(pTableCond->z); + char* str = taosStringBuilderGetResult(sb, NULL); + pCmd->tagCond.tbnameCond.cond = strdup(str); return TSDB_CODE_SUCCESS; } - strcpy(pTableCond->z, QUERY_COND_REL_PREFIX_IN); - pTableCond->n += strlen(QUERY_COND_REL_PREFIX_IN); + SStringBuilder sb1 = {0}; + taosStringBuilderAppendStringLen(&sb1, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN); char db[TSDB_METER_ID_LEN] = {0}; // remove the duplicated input table names int32_t num = 0; - char** segments = strsplit(tmpTableCondBuf + QUERY_COND_REL_PREFIX_IN_LEN, TBNAME_LIST_SEP, &num); - qsort(segments, num, sizeof(void*), tableNameCompar); + char* tableNameString = taosStringBuilderGetResult(sb, NULL); + + char** segments = strsplit(tableNameString + QUERY_COND_REL_PREFIX_IN_LEN, TBNAME_LIST_SEP, &num); + qsort(segments, num, POINTER_BYTES, tableNameCompar); int32_t j = 1; for (int32_t i = 1; i < num; ++i) { @@ -3875,25 +3869,30 @@ static int32_t setTableCondForMetricQuery(SSqlObj* pSql, tSQLExpr* pExpr, int16_ char* acc = getAccountId(pSql); for (int32_t i = 0; i < num; ++i) { - SStringEnsureRemain(pTableCond, TSDB_METER_ID_LEN); - if (i >= 1) { - pTableCond->z[pTableCond->n++] = TBNAME_LIST_SEP[0]; + taosStringBuilderAppendStringLen(&sb1, TBNAME_LIST_SEP, 1); } - + + char idBuf[TSDB_METER_ID_LEN + 1] = {0}; int32_t xlen = strlen(segments[i]); SSQLToken t = {.z = segments[i], .n = xlen, .type = TK_STRING}; - int32_t ret = setObjFullName(pTableCond->z + pTableCond->n, acc, &dbToken, &t, &xlen); + int32_t ret = setObjFullName(idBuf, acc, &dbToken, &t, &xlen); if (ret != TSDB_CODE_SUCCESS) { + taosStringBuilderDestroy(&sb1); tfree(segments); + invalidSqlErrMsg(pCmd, msg); return ret; } - - pTableCond->n += xlen; + + taosStringBuilderAppendString(&sb1, idBuf); } - + + char* str = taosStringBuilderGetResult(&sb1, NULL); + pCmd->tagCond.tbnameCond.cond = strdup(str); + + taosStringBuilderDestroy(&sb1); tfree(segments); return TSDB_CODE_SUCCESS; } @@ -4071,10 +4070,9 @@ int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) SSqlCmd* pCmd = &pSql->cmd; /* - * tags query condition may be larger than 512bytes, - * therefore, we need to prepare enough large space + * tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space */ - char tableNameCond[TSDB_MAX_SQL_LEN] = {0}; + SStringBuilder sb = {0}; int32_t ret = TSDB_CODE_SUCCESS; if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) { @@ -4119,7 +4117,7 @@ int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) } // 4. get the table name query condition - if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, tableNameCond)) != TSDB_CODE_SUCCESS) { + if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, &sb)) != TSDB_CODE_SUCCESS) { return ret; } @@ -4135,7 +4133,10 @@ int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) // 7. query condition for table name pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; - ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, tableNameCond); + + ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, &sb); + taosStringBuilderDestroy(&sb); + if (!validateFilterExpr(pCmd)) { return invalidSqlErrMsg(pCmd, msg); } @@ -5156,7 +5157,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIn if (pExpr->functionId != TSDB_FUNC_TAG) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); - int16_t columnInfo = tscGetJoinTagColIndexByUid(pCmd, pMeterMetaInfo->pMeterMeta->uid); + int16_t columnInfo = tscGetJoinTagColIndexByUid(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid); SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; SSchema* pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d25b2dde53b4456c0127a3a340c6a0e334451712..bf175c55405d940e0d8da9e6552436193c13d0f5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -689,7 +689,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId SSqlExpr *pExpr = tscSqlExprGet(&pNew->cmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd, pMeterMetaInfo->pMeterMeta->uid); + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pMeterMetaInfo->pMeterMeta->uid); pExpr->param->i64Key = tagColIndex; pExpr->numOfParams = 1; @@ -2741,10 +2741,14 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) { int32_t n = 0; for (int32_t i = 0; i < pCmd->tagCond.numOfTagCond; ++i) { - n += pCmd->tagCond.cond[i].cond.n; + n += strlen(pCmd->tagCond.cond[i].cond); } - int32_t tagLen = n * TSDB_NCHAR_SIZE + pCmd->tagCond.tbnameCond.cond.n * TSDB_NCHAR_SIZE; + int32_t tagLen = n * TSDB_NCHAR_SIZE; + if (pCmd->tagCond.tbnameCond.cond != NULL) { + tagLen += strlen(pCmd->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE; + } + int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2; int32_t elemSize = sizeof(SMetricMetaElemMsg) * pCmd->numOfTables; @@ -2816,8 +2820,9 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) { if (pTagCond->numOfTagCond > 0) { SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid); if (pCond != NULL) { - condLen = pCond->cond.n + 1; - bool ret = taosMbsToUcs4(pCond->cond.z, pCond->cond.n, pMsg, pCond->cond.n * TSDB_NCHAR_SIZE); + condLen = strlen(pCond->cond) + 1; + + bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE); if (!ret) { tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid)); return 0; @@ -2836,15 +2841,17 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) { offset = pMsg - (char *)pMetaMsg; pElem->tableCond = htonl(offset); - pElem->tableCondLen = htonl(pTagCond->tbnameCond.cond.n); + + uint32_t len = strlen(pTagCond->tbnameCond.cond); + pElem->tableCondLen = htonl(len); - memcpy(pMsg, pTagCond->tbnameCond.cond.z, pTagCond->tbnameCond.cond.n); - pMsg += pTagCond->tbnameCond.cond.n; + memcpy(pMsg, pTagCond->tbnameCond.cond, len); + pMsg += len; } SSqlGroupbyExpr *pGroupby = &pCmd->groupbyExpr; - if (pGroupby->tableIndex != i) { + if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) { pElem->orderType = 0; pElem->orderIndex = 0; pElem->numOfGroupCols = 0; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 233a37dc5930f8b5d16d608882ced99148493c4f..4d7f2734a940500f04af0c2cf1abb4ac8f6e6335 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -270,7 +270,7 @@ int taos_query(TAOS *taos, const char *sqlstr) { SSqlRes *pRes = &pSql->res; size_t sqlLen = strlen(sqlstr); - if (sqlLen > TSDB_MAX_SQL_LEN) { + if (sqlLen > tsMaxSQLStringLen) { pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); @@ -786,7 +786,6 @@ int taos_errno(TAOS *taos) { char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; uint8_t code; -// char temp[256] = {0}; if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode]; @@ -797,11 +796,13 @@ char *taos_errstr(TAOS *taos) { // for invalid sql, additional information is attached to explain why the sql is invalid if (code == TSDB_CODE_INVALID_SQL) { -// snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload); -// strcpy(pObj->pSql->cmd.payload, temp); return pObj->pSql->cmd.payload; } else { - return tsError[code]; + if (code < 0 || code > TSDB_CODE_MAX_ERROR_CODE) { + return tsError[TSDB_CODE_SUCCESS]; + } else { + return tsError[code]; + } } } @@ -924,7 +925,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { tscTrace("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); int32_t sqlLen = strlen(sql); - if (sqlLen > TSDB_MAX_SQL_LEN) { + if (sqlLen > tsMaxSQLStringLen) { tscError("%p sql too long", pSql); pRes->code = TSDB_CODE_INVALID_SQL; return pRes->code; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index baafb57f6a57b924b22abe71460ed5cd034ff853..7fd3d7706bc4bf7234593b29f51bff7fc04f417d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -51,7 +51,6 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) { assert(len < tListLen(tagIdBuf)); const int32_t maxKeySize = TSDB_MAX_TAGS_LEN; // allowed max key size - char* tmp = calloc(1, TSDB_MAX_SQL_LEN); SCond* cond = tsGetMetricQueryCondPos(pTagCond, uid); @@ -60,12 +59,24 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) { sprintf(join, "%s,%s", pTagCond->joinInfo.left.meterId, pTagCond->joinInfo.right.meterId); } - int32_t keyLen = - snprintf(tmp, TSDB_MAX_SQL_LEN, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name, - (cond != NULL ? cond->cond.z : NULL), pTagCond->tbnameCond.cond.n > 0 ? pTagCond->tbnameCond.cond.z : NULL, + // estimate the buffer size + size_t tbnameCondLen = pTagCond->tbnameCond.cond != NULL? strlen(pTagCond->tbnameCond.cond):0; + size_t redundantLen = 20; + + size_t bufSize = strlen(pMeterMetaInfo->name) + tbnameCondLen + strlen(join) + strlen(tagIdBuf); + if (cond != NULL) { + bufSize += strlen(cond->cond); + } + + bufSize = (size_t) ((bufSize + redundantLen) * 1.5); + char* tmp = calloc(1, bufSize); + + int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name, + (cond != NULL ? cond->cond : NULL), + (tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL), pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType); - assert(keyLen <= TSDB_MAX_SQL_LEN); + assert(keyLen <= bufSize); if (keyLen < maxKeySize) { strcpy(str, tmp); @@ -99,7 +110,7 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) { SCond* pDest = &pTagCond->cond[pTagCond->numOfTagCond]; pDest->uid = uid; - pDest->cond = SStringCreate(str); + pDest->cond = strdup(str); pTagCond->numOfTagCond += 1; } @@ -1340,14 +1351,20 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId) { void tscTagCondCopy(STagCond* dest, const STagCond* src) { memset(dest, 0, sizeof(STagCond)); + + if (src->tbnameCond.cond != NULL) { + dest->tbnameCond.cond = strdup(src->tbnameCond.cond); + } - SStringCopy(&dest->tbnameCond.cond, &src->tbnameCond.cond); dest->tbnameCond.uid = src->tbnameCond.uid; memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo)); for (int32_t i = 0; i < src->numOfTagCond; ++i) { - SStringCopy(&dest->cond[i].cond, &src->cond[i].cond); + if (src->cond[i].cond != NULL) { + dest->cond[i].cond = strdup(src->cond[i].cond); + } + dest->cond[i].uid = src->cond[i].uid; } @@ -1356,10 +1373,9 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) { } void tscTagCondRelease(STagCond* pCond) { - SStringFree(&pCond->tbnameCond.cond); - + free(pCond->tbnameCond.cond); for (int32_t i = 0; i < pCond->numOfTagCond; ++i) { - SStringFree(&pCond->cond[i].cond); + free(pCond->cond[i].cond); } memset(pCond, 0, sizeof(STagCond)); @@ -1571,123 +1587,6 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { pRes->numOfRows = 0; } -SString SStringCreate(const char* str) { - size_t len = strlen(str); - - SString dest = {.n = len, .alloc = len + 1}; - dest.z = calloc(1, dest.alloc); - strcpy(dest.z, str); - - return dest; -} - -void SStringCopy(SString* pDest, const SString* pSrc) { - if (pSrc->n > 0) { - pDest->n = pSrc->n; - pDest->alloc = pDest->n + 1; // one additional space for null terminate - - pDest->z = calloc(1, pDest->alloc); - - memcpy(pDest->z, pSrc->z, pDest->n); - } else { - memset(pDest, 0, sizeof(SString)); - } -} - -void SStringFree(SString* pStr) { - if (pStr->alloc > 0) { - tfree(pStr->z); - pStr->alloc = 0; - } -} - -void SStringShrink(SString* pStr) { - if (pStr->alloc > (pStr->n + 1) && pStr->alloc > (pStr->n * 2)) { - pStr->z = realloc(pStr->z, pStr->n + 1); - assert(pStr->z != NULL); - - pStr->alloc = pStr->n + 1; - } -} - -int32_t SStringAlloc(SString* pStr, int32_t size) { - if (pStr->alloc >= size) { - return TSDB_CODE_SUCCESS; - } - - size = ALIGN8(size); - - char* tmp = NULL; - if (pStr->z != NULL) { - tmp = realloc(pStr->z, size); - memset(pStr->z + pStr->n, 0, size - pStr->n); - } else { - tmp = calloc(1, size); - } - - if (tmp == NULL) { -#ifdef WINDOWS - LPVOID lpMsgBuf; - FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); - tscTrace("failed to allocate memory, reason:%s", lpMsgBuf); - LocalFree(lpMsgBuf); -#else - char errmsg[256] = {0}; - strerror_r(errno, errmsg, tListLen(errmsg)); - tscTrace("failed to allocate memory, reason:%s", errmsg); -#endif - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - pStr->z = tmp; - pStr->alloc = size; - - return TSDB_CODE_SUCCESS; -} - -#define MIN_ALLOC_SIZE 8 - -int32_t SStringEnsureRemain(SString* pStr, int32_t size) { - if (pStr->alloc - pStr->n > size) { - return TSDB_CODE_SUCCESS; - } - - // remain space is insufficient, allocate more spaces - int32_t inc = (size >= MIN_ALLOC_SIZE) ? size : MIN_ALLOC_SIZE; - if (inc < (pStr->alloc >> 1)) { - inc = (pStr->alloc >> 1); - } - - // get the new size - int32_t newsize = pStr->alloc + inc; - - char* tmp = realloc(pStr->z, newsize); - if (tmp == NULL) { -#ifdef WINDOWS - LPVOID lpMsgBuf; - FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, - GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR)&lpMsgBuf, 0, NULL); - tscTrace("failed to allocate memory, reason:%s", lpMsgBuf); - LocalFree(lpMsgBuf); -#else - char errmsg[256] = {0}; - strerror_r(errno, errmsg, tListLen(errmsg)); - tscTrace("failed to allocate memory, reason:%s", errmsg); -#endif - - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - - memset(tmp + pStr->n, 0, inc); - pStr->alloc = newsize; - pStr->z = tmp; - - return TSDB_CODE_SUCCESS; -} - SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; @@ -1822,9 +1721,7 @@ void tscDoQuery(SSqlObj* pSql) { } } -int16_t tscGetJoinTagColIndexByUid(SSqlCmd* pCmd, uint64_t uid) { - STagCond* pTagCond = &pCmd->tagCond; - +int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) { if (pTagCond->joinInfo.left.uid == uid) { return pTagCond->joinInfo.left.tagCol; } else { diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5510212dbc37f6ce3ebb33c43d34ca93fc0649c1..5fee1d7da563eac57f3cb482ca12869e2b8bafe7 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -128,7 +128,7 @@ extern "C" { #define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered #define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered #define TSDB_CODE_INVALID_COMMIT_LOG 109 // commit log init failed -#define TSDB_CODE_SERVER_NO_SPACE 110 +#define TSDB_CODE_SERV_NO_DISKSPACE 110 #define TSDB_CODE_NOT_SUPER_TABLE 111 // operation only available for super table #define TSDB_CODE_DUPLICATE_TAGS 112 // tags value for join not unique #define TSDB_CODE_INVALID_SUBMIT_MSG 113 @@ -137,6 +137,8 @@ extern "C" { #define TSDB_CODE_INVALID_VNODE_STATUS 116 #define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117 +#define TSDB_CODE_MAX_ERROR_CODE 118 + #ifdef __cplusplus } #endif diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index ede3c97ce9f874152e8b81d78b6b1b19adc49b49..ed91964d611ccd4e578007794fcf9c9940123cf0 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -106,7 +106,6 @@ extern int tsMaxDbs; extern int tsMaxTables; extern int tsMaxDnodes; extern int tsMaxVGroups; -extern int tsShellActivityTimer; extern char tsMgmtZone[]; extern char tsLocalIp[]; @@ -127,6 +126,7 @@ extern int tsEnableHttpModule; extern int tsEnableMonitorModule; extern int tsRestRowLimit; extern int tsCompressMsgSize; +extern int tsMaxSQLStringLen; extern char tsSocketType[4]; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 8b057654c68a4c8f39bf541e07f1e171ebed0091..1190dc8420fe360a667c2d932cfc8a20e4321015 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -100,6 +100,7 @@ extern "C" { #define TSDB_COL_NAME_LEN 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 16 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE +#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb #define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 16 #define TSDB_MAX_TAGS_LEN 512 diff --git a/src/rpc/src/tstring.c b/src/rpc/src/tstring.c index e3daca130a83c1c363dad3eeb9bef1d2394842b2..63f65b882f59b07b434d908e33aba455c43b5178 100644 --- a/src/rpc/src/tstring.c +++ b/src/rpc/src/tstring.c @@ -238,7 +238,7 @@ char *tsError[] = {"success", "only super table has metric meta info", "tags value not unique for join", "invalid submit message", - "not active table(not created yet or deleted already)", //114 + "not active table(not created yet or dropped already)", //114 "invalid table id", "invalid vnode status", //116 "failed to lock resources", diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 35279ca011f1a8aeec012e1d2311862dde2f95ca..2002483f03136fcc65d0bb1727ac169d12473db7 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -27,7 +27,13 @@ extern "C" { #define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query)) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) +/* + * set the output buffer page size is 16k + * The page size should be sufficient for at least one output result or intermediate result. + * Some intermediate results may be extremely large, such as top/bottom(100) query. + */ #define DEFAULT_INTERN_BUF_SIZE 16384L + #define INIT_ALLOCATE_DISK_PAGES 60L #define DEFAULT_DATA_FILE_MAPPING_PAGES 2L #define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE) @@ -160,7 +166,7 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport); void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport); int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position); -void doCloseAllOpenedResults(SMeterQuerySupportObj* pSupporter); +int32_t doCloseAllOpenedResults(SMeterQuerySupportObj* pSupporter); void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); @@ -185,7 +191,7 @@ void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len); void setExecutionContext(SMeterQuerySupportObj* pSupporter, SOutputRes* outputRes, int32_t meterIdx, int32_t groupIdx, SMeterQueryInfo* sqinfo); -void setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo); +int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo); int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange); int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); @@ -224,11 +230,11 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY /** * add the new allocated disk page to meter query info * the new allocated disk page is used to keep the intermediate (interval) results - * + * @param pQuery * @param pMeterQueryInfo * @param pSupporter */ -tFilePage* addDataPageForMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter); +tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter); /** * save the query range data into SMeterQueryInfo diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9620ab38dcbc357f1649a9b909b1abecb4c73233..5174753d8224a15b20af798844e50057cc13b2ce 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -67,13 +67,13 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields, __block_search_fn_t searchFn); -static void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); +static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields, __block_search_fn_t searchFn); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); -static void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, +static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv); static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); @@ -413,7 +413,7 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int vnodeSetOpenedFileNames(pVnodeFileInfo); if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { - doCloseOpenedFileData(pVnodeFileInfo); // there may be partially open fd, close it anyway. + doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. return pVnodeFileInfo->pHeaderFileData; } } @@ -1291,9 +1291,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - // if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - // continue; - // } SField dummyField = {0}; @@ -3052,7 +3049,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); if (pDir == NULL) { - dError("QInfo:%p failed to open directory:%s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix); + dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix, strerror(errno)); return; } @@ -3920,11 +3917,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - - // set 4k page for each meter + pSupporter->numOfPages = pSupporter->numOfMeters; - ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); + ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); + if (ret != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, + strerror(errno)); + return TSDB_CODE_SERV_NO_DISKSPACE; + } + pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->lastPageId = -1; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; @@ -3932,7 +3934,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pSupporter->meterOutputMMapBuf = mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { - dError("QInfo:%p failed to map data file: %s to disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); + dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } } @@ -4733,20 +4735,24 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { SQuery * pQuery = pRuntimeEnv->pQuery; int64_t st = taosGetTimestampMs(); + int32_t ret = TSDB_CODE_SUCCESS; while (pSupporter->subgroupIdx < pSupporter->pSidSet->numOfSubSet) { int32_t start = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx]; int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1]; - int32_t ret = - doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); + ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); + if (ret < 0) { // not enough disk space to save the data into disk + return -1; + } + pSupporter->subgroupIdx += 1; - /* this group generates at least one result, return results */ + // this group generates at least one result, return results if (ret > 0) { break; } - + assert(pSupporter->numOfGroupResultPages == 0); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1); } @@ -4754,7 +4760,7 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1, pSupporter->pSidSet->numOfSubSet, taosGetTimestampMs() - st); - return pSupporter->numOfGroupResultPages; + return TSDB_CODE_SUCCESS; } void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) { @@ -4762,7 +4768,9 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) pSupporter->numOfGroupResultPages = 0; // current results of group has been sent to client, try next group - mergeMetersResultToOneGroups(pSupporter); + if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) { + return; // failed to save data in the disk + } // set current query completed if (pSupporter->numOfGroupResultPages == 0 && pSupporter->subgroupIdx == pSupporter->pSidSet->numOfSubSet) { @@ -4840,7 +4848,10 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery } else { // copy data to disk buffer if (buffer[0]->numOfElems == pQuery->pointsToRead) { - flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv); + if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { + return -1; + } + resetMergeResultBuf(pQuery, pCtx); } @@ -4887,7 +4898,14 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery } if (buffer[0]->numOfElems != 0) { // there are data in buffer - flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv); + if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile); + tfree(pTree); + tfree(pValidMeter); + tfree(posArray); + + return -1; + } } int64_t endt = taosGetTimestampMs(); @@ -4906,25 +4924,44 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery return pSupporter->numOfGroupResultPages; } -static void extendDiskBuf(SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { +static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); - + + SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery); + int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); pSupporter->numOfPages = numOfPages; - // disk-based output buffer is exhausted, try to extend the disk-based buffer + /* + * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may + * be insufficient + */ ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); if (ret != 0) { - perror("error in allocate the disk-based buffer"); - return; + dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, + strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; + pQInfo->killed = 1; + + return pQInfo->code; } pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->meterOutputMMapBuf = mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); + + if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { + dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->killed = 1; + + return pQInfo->code; + } + + return TSDB_CODE_SUCCESS; } -void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { +int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); @@ -4935,7 +4972,9 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, requiredPages += pSupporter->numOfMeters; } - extendDiskBuf(pSupporter, requiredPages); + if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) { + return -1; + } } char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages + @@ -4949,6 +4988,7 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, } pSupporter->numOfGroupResultPages += 1; + return TSDB_CODE_SUCCESS; } void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx) { @@ -4966,7 +5006,7 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3 pMeterDataInfo->meterOrderIdx = meterIdx; } -void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { +int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -4980,11 +5020,20 @@ void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { pRuntimeEnv->pMeterObj = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[index]->sid); assert(pRuntimeEnv->pMeterObj == pMeterInfo[i].pMeterObj); - setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo); - saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); + int32_t ret = setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + + ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } } } } + + return TSDB_CODE_SUCCESS; } void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { @@ -5690,18 +5739,24 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY } } -static tFilePage *allocNewPage(SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { +static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { - extendDiskBuf(pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters); + if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { + return NULL; + } } *pageId = (++pSupporter->lastPageId); return getFilePage(pSupporter, *pageId); } -tFilePage *addDataPageForMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { +tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { uint32_t pageId = 0; - tFilePage *pPage = allocNewPage(pSupporter, &pageId); + + tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); + if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results + return NULL; + } if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) { pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1; @@ -6199,46 +6254,53 @@ void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t } } -void setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { +int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; tFilePage * pData = NULL; + SQuery* pQuery = pRuntimeEnv->pQuery; + // in the first scan, new space needed for results if (pMeterQueryInfo->numOfPages == 0) { - pData = addDataPageForMeterQueryInfo(pMeterQueryInfo, pSupporter); + pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); } else { int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; pData = getFilePage(pSupporter, lastPageId); if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { - pData = addDataPageForMeterQueryInfo(pMeterQueryInfo, pSupporter); - assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer + pData = addDataPageForMeterQueryInfo(pRuntimeEnv->pQuery, pMeterQueryInfo, pSupporter); + if (pData != NULL) { + assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer + } } } + + if (pData == NULL) { + return -1; + } for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i); pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i]; } + + return TSDB_CODE_SUCCESS; } -void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, +int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; if (IS_MASTER_SCAN(pRuntimeEnv)) { - setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); + if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { + // not enough disk space or memory buffer for intermediate results + return -1; + } if (pMeterQueryInfo->lastResRows == 0) { initCtxOutputBuf(pRuntimeEnv); } - // reset the number of iterated elements, once this function is called. since the pCtx for different - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - // pRuntimeEnv->pCtx[j].numOfIteratedElems = 0; - } - } else { if (pMeterQueryInfo->reverseFillRes) { setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); @@ -6249,7 +6311,9 @@ void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t * * If the master scan does not produce any results, new spaces needed to be allocated during supplement scan */ - setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); + if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { + return -1; + } } } @@ -6659,14 +6723,14 @@ static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo } } -void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult) { +int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; // no results generated, do nothing for master scan if (numOfResult <= 0) { if (IS_MASTER_SCAN(pRuntimeEnv)) { - return; + return TSDB_CODE_SUCCESS; } else { /* * There is a case that no result generated during the the supplement scan, and during the main @@ -6691,7 +6755,7 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); } - return; + return TSDB_CODE_SUCCESS; } } @@ -6720,7 +6784,9 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI pMeterQueryInfo->numOfRes += numOfResult; assert(pData->numOfElems <= pRuntimeEnv->numOfRowsPerPage); - setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); + if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { + return -1; + } for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { resetResultInfo(&pMeterQueryInfo->resultInfo[i]); @@ -6743,6 +6809,8 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); #endif } + + return TSDB_CODE_SUCCESS; } static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index dea865e5cdf2ba833b5f49c96fb98033e752e550..cec76d1cba836d3161d32240b1ccf3af91230fa5 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -157,7 +157,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe setExecutionContext(pSupporter, pSupporter->pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo); } else { - setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); + int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); + if (ret != TSDB_CODE_SUCCESS) { + pQInfo->killed = 1; + return NULL; + } } qTrace("QInfo:%p vid:%d sid:%d id:%s, query in cache, qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode, @@ -306,7 +310,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe if (pReqMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); - pQInfo->code = TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; return NULL; } @@ -338,7 +342,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); tfree(pReqMeterDataInfo); - pQInfo->code = TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; return NULL; } @@ -393,7 +397,12 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx, pOneMeterDataInfo->groupIdx, pMeterQueryInfo); } else { // interval query - setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); + int32_t ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); + if (ret != TSDB_CODE_SUCCESS) { + tfree(pReqMeterDataInfo); // error code has been set + pQInfo->killed = 1; + return NULL; + } } SCompBlock *pBlock = pInfoEx->pBlock.compBlock; @@ -900,7 +909,12 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st, pQuery->order.order ^ 1); - doCloseAllOpenedResults(pSupporter); + // failed to save all intermediate results into disk, abort further query processing + if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo); + return; + } + doMultiMeterSupplementaryScan(pQInfo); if (isQueryKilled(pQuery)) { @@ -911,12 +925,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { if (pQuery->nAggTimeInterval > 0) { assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); - mergeMetersResultToOneGroups(pSupporter); - copyResToQueryResultBuf(pSupporter, pQuery); - + if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { + copyResToQueryResultBuf(pSupporter, pQuery); + #ifdef _DEBUG_VIEW - displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); + displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); #endif + } } else { // not a interval query copyFromGroupBuf(pQInfo, pSupporter->pResult); } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 81e4f6e370ae0a8591ec7c63fcdabc53732df2ec..d6f0796121ddc180325ad4cdf8e9012bd35cebfa 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -824,11 +824,11 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t * } if (pQInfo->killed) { - dTrace("QInfo:%p it is already killed, %p, code:%d", pQInfo, pQuery, pQInfo->code); + dTrace("QInfo:%p query is killed, %p, code:%d", pQInfo, pQuery, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; } else { // in case of not TSDB_CODE_SUCCESS, return the code to client - return pQInfo->code; + return abs(pQInfo->code); } } @@ -837,8 +837,13 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t * *rowSize = pQuery->rowSize; *timePrec = vnodeList[pQInfo->pObj->vnode].cfg.precision; - - if (pQInfo->code < 0) return -pQInfo->code; + + dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, + *rowSize, *numOfRows, pQInfo->code); + + if (pQInfo->code < 0) { // less than 0 means there are error existed. + return -pQInfo->code; + } return TSDB_CODE_SUCCESS; } diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index f4b3cdbbe72a7adc2321987335f861ce9b2034c2..ba9e682f8b775e082542ebe834ccbd2f8ab1a52b 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -606,7 +606,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { if (tsAvailDataDirGB < tsMinimalDataDirGB) { dError("server disk space remain %.3f GB, need at least %.3f GB, stop writing", tsAvailDataDirGB, tsMinimalDataDirGB); - code = TSDB_CODE_SERVER_NO_SPACE; + code = TSDB_CODE_SERV_NO_DISKSPACE; goto _submit_over; } diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index cef11d30cba8fe4488a0cc6adcad7b4143f4fe8d..80f76a3d25be300866f4f90c0ebb444242402185 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -124,6 +124,7 @@ int tsMgmtEqualVnodeNum = 0; int tsEnableHttpModule = 1; int tsEnableMonitorModule = 1; int tsRestRowLimit = 10240; +int tsMaxSQLStringLen = TSDB_MAX_SQL_LEN; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -653,7 +654,11 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "compressMsgSize", &tsCompressMsgSize, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW, -1, 10000000, 0, TSDB_CFG_UTYPE_NONE); - + + tsInitConfigOption(cfg++, "maxSQLLength", &tsMaxSQLStringLen, TSDB_CFG_VTYPE_INT, + TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW, + TSDB_MAX_SQL_LEN, TSDB_MAX_ALLOWED_SQL_LEN, 0, TSDB_CFG_UTYPE_BYTE); + // locale & charset tsInitConfigOption(cfg++, "timezone", tsTimezone, TSDB_CFG_VTYPE_STRING, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT, diff --git a/src/util/src/tstrbuild.c b/src/util/src/tstrbuild.c index 6fb970bd6e3de13d6119089759fcf030d6c618fc..691435749317f637b28140a30808f2896c8c04f6 100644 --- a/src/util/src/tstrbuild.c +++ b/src/util/src/tstrbuild.c @@ -13,10 +13,8 @@ * along with this program. If not, see . */ +#include "os.h" #include "tstrbuild.h" -#include -#include -#include void taosStringBuilderEnsureCapacity(SStringBuilder* sb, size_t size) { size += sb->pos;