/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "os.h" #include "taosmsg.h" #include "taosdef.h" #include "tcache.h" #include "tname.h" #include "tscLog.h" #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength); static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) { char buf[512] = {0}; int32_t len = 0; int32_t MAX_BOOL_TYPE_LENGTH = 5; // max(strlen("true"), strlen("false")); switch (type) { case TSDB_DATA_TYPE_BINARY: return length; case TSDB_DATA_TYPE_NCHAR: return length; case TSDB_DATA_TYPE_DOUBLE: { double dv = 0; dv = GET_DOUBLE_VAL(pData); len = sprintf(buf, "%lf", dv); if (strncasecmp("nan", buf, 3) == 0) { len = 4; } } break; case TSDB_DATA_TYPE_FLOAT: { float fv = 0; fv = GET_FLOAT_VAL(pData); len = sprintf(buf, "%f", fv); if (strncasecmp("nan", buf, 3) == 0) { len = 4; } } break; case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: len = sprintf(buf, "%" PRId64, *(int64_t *)pData); break; case TSDB_DATA_TYPE_BOOL: len = MAX_BOOL_TYPE_LENGTH; break; default: len = sprintf(buf, "%d", *(int32_t *)pData); break; }; return len; } /* * we need to convert all data into string, so we need to sprintf all kinds of * non-string data into string, and record its length to get the right * maximum length. The length may be less or greater than its original binary length: * For example: * length((short) 1) == 1, less than sizeof(short) * length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t) */ static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) { STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta; if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE || pMeta->tableType == TSDB_STREAM_TABLE) { return 0; } char * pTagValue = tsGetTagsValue(pMeta); SSchema *pTagsSchema = tscGetTableTagSchema(pMeta); int32_t len = getToStringLength(pTagValue, pTagsSchema[0].bytes, pTagsSchema[0].type); pTagValue += pTagsSchema[0].bytes; int32_t numOfTags = tscGetNumOfTags(pMeta); for (int32_t i = 1; i < numOfTags; ++i) { int32_t tLen = getToStringLength(pTagValue, pTagsSchema[i].bytes, pTagsSchema[i].type); if (len < tLen) { len = tLen; } pTagValue += pTagsSchema[i].bytes; } return len; } static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { SSqlRes *pRes = &pSql->res; // one column for each row SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pMeta = pTableMetaInfo->pTableMeta; /* * tagValueCnt is to denote the number of tags columns for meter, not metric. and is to show the column data. * for meter, which is created according to metric, the value of tagValueCnt is not 0, and the numOfTags must be 0. * for metric, the value of tagValueCnt must be 0, but the numOfTags is not 0 */ int32_t numOfRows = tscGetNumOfColumns(pMeta); int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { numOfRows = numOfRows + tscGetNumOfTags(pMeta); } tscInitResObjForLocalQuery(pSql, totalNumOfRows, rowLen); SSchema *pSchema = tscGetTableSchema(pMeta); for (int32_t i = 0; i < numOfRows; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i; STR_WITH_MAXSIZE_TO_VARSTR(dst, pSchema[i].name, pField->bytes); char *type = tDataTypeDesc[pSchema[i].type].aName; pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1); dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i; STR_WITH_MAXSIZE_TO_VARSTR(dst, type, pField->bytes); int32_t bytes = pSchema[i].bytes; if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) { bytes -= VARSTR_HEADER_SIZE; if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) { bytes = bytes / TSDB_NCHAR_SIZE; } } pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2); *(int32_t *)(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 2) * totalNumOfRows + pField->bytes * i) = bytes; pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3); if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) { char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i; const char *src = "TAG"; STR_WITH_MAXSIZE_TO_VARSTR(output, src, pField->bytes); } } if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return 0; } // the following is handle display tags value for meters created according to metric char *pTagValue = tsGetTagsValue(pMeta); for (int32_t i = numOfRows; i < totalNumOfRows; ++i) { // field name TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i; STR_WITH_MAXSIZE_TO_VARSTR(output, pSchema[i].name, pField->bytes); // type name pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1); char *type = tDataTypeDesc[pSchema[i].type].aName; output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i; STR_WITH_MAXSIZE_TO_VARSTR(output, type, pField->bytes); // type length int32_t bytes = pSchema[i].bytes; pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2); if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) { bytes -= VARSTR_HEADER_SIZE; if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) { bytes = bytes / TSDB_NCHAR_SIZE; } } *(int32_t *)(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 2) * totalNumOfRows + pField->bytes * i) = bytes; // tag value pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3); char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i; const char *src = "TAG"; STR_WITH_MAXSIZE_TO_VARSTR(target, src, pField->bytes); pTagValue += pSchema[i].bytes; } return 0; } static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, int32_t typeColLength, int32_t noteColLength) { int32_t rowLen = 0; SColumnIndex index = {0}; pSql->cmd.numOfCols = numOfCols; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); pQueryInfo->order.order = TSDB_ORDER_ASC; TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE}; tstrncpy(f.name, "Field", sizeof(f.name)); SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false); rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE); f.bytes = (int16_t)(typeColLength + VARSTR_HEADER_SIZE); f.type = TSDB_DATA_TYPE_BINARY; tstrncpy(f.name, "Type", sizeof(f.name)); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE), typeColLength, false); rowLen += typeColLength + VARSTR_HEADER_SIZE; f.bytes = sizeof(int32_t); f.type = TSDB_DATA_TYPE_INT; tstrncpy(f.name, "Length", sizeof(f.name)); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), sizeof(int32_t), false); rowLen += sizeof(int32_t); f.bytes = (int16_t)(noteColLength + VARSTR_HEADER_SIZE); f.type = TSDB_DATA_TYPE_BINARY; tstrncpy(f.name, "Note", sizeof(f.name)); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE), noteColLength, false); rowLen += noteColLength + VARSTR_HEADER_SIZE; return rowLen; } static int32_t tscProcessDescribeTable(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL); const int32_t NUM_OF_DESC_TABLE_COLUMNS = 4; const int32_t TYPE_COLUMN_LENGTH = 16; const int32_t NOTE_COLUMN_MIN_LENGTH = 8; int32_t noteFieldLen = tscMaxLengthOfTagsFields(pSql); if (noteFieldLen == 0) { noteFieldLen = NOTE_COLUMN_MIN_LENGTH; } int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen); tscFieldInfoUpdateOffset(pQueryInfo); return tscSetValueToResObj(pSql, rowLen); } static int32_t tscProcessCurrentUser(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY; pExpr->resType = TSDB_DATA_TYPE_BINARY; char* vx = calloc(1, pExpr->resBytes); if (vx == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } size_t size = sizeof(pSql->pTscObj->user); STR_WITH_MAXSIZE_TO_VARSTR(vx, pSql->pTscObj->user, size); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); return TSDB_CODE_SUCCESS; } static int32_t tscProcessCurrentDB(SSqlObj *pSql) { char db[TSDB_DB_NAME_LEN] = {0}; extractDBName(pSql->pTscObj->db, db); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; size_t t = strlen(db); pExpr->resBytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE; char* vx = calloc(1, pExpr->resBytes); if (vx == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } if (t == 0) { setVardataNull(vx, TSDB_DATA_TYPE_BINARY); } else { STR_WITH_SIZE_TO_VARSTR(vx, db, (VarDataLenT)t); } tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); return TSDB_CODE_SUCCESS; } static int32_t tscProcessServerVer(SSqlObj *pSql) { const char* v = pSql->pTscObj->sversion; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; size_t t = strlen(v); pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); char* vx = calloc(1, pExpr->resBytes); if (vx == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } STR_WITH_SIZE_TO_VARSTR(vx, v, (VarDataLenT)t); tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(vx); return TSDB_CODE_SUCCESS; } static int32_t tscProcessClientVer(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; size_t t = strlen(version); pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE); char* v = calloc(1, pExpr->resBytes); if (v == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } STR_WITH_SIZE_TO_VARSTR(v, version, (VarDataLenT)t); tscSetLocalQueryResult(pSql, v, pExpr->aliasName, pExpr->resType, pExpr->resBytes); free(v); return TSDB_CODE_SUCCESS; } static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; if (pObj->pHb != NULL) { if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; return pSql->res.code; } } else { if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { return pSql->res.code; } } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); int32_t val = 1; tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); return TSDB_CODE_SUCCESS; } void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; pCmd->numOfCols = 1; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pQueryInfo->order.order = TSDB_ORDER_ASC; tscFieldInfoClear(&pQueryInfo->fieldsInfo); pQueryInfo->fieldsInfo.pFields = taosArrayInit(1, sizeof(TAOS_FIELD)); pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(1, sizeof(SFieldSupInfo)); TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength); tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0); pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0); memcpy(pRes->data, val, pField->bytes); } int tscProcessLocalCmd(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; if (pCmd->command == TSDB_SQL_CFG_LOCAL) { pRes->code = (uint8_t)taosCfgDynamicOptions(pCmd->payload); } else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) { pRes->code = (uint8_t)tscProcessDescribeTable(pSql); } else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { /* * set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to * free allocated resources and remove the SqlObj from sql query linked list */ pRes->qhandle = 0x1; pRes->numOfRows = 0; } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { taosCacheEmpty(tscCacheHandle); pRes->code = TSDB_CODE_SUCCESS; } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { pRes->code = tscProcessServerVer(pSql); } else if (pCmd->command == TSDB_SQL_CLI_VERSION) { pRes->code = tscProcessClientVer(pSql); } else if (pCmd->command == TSDB_SQL_CURRENT_USER) { pRes->code = tscProcessCurrentUser(pSql); } else if (pCmd->command == TSDB_SQL_CURRENT_DB) { pRes->code = tscProcessCurrentDB(pSql); } else if (pCmd->command == TSDB_SQL_SERV_STATUS) { pRes->code = tscProcessServStatus(pSql); } else { pRes->code = TSDB_CODE_TSC_INVALID_SQL; tscError("%p not support command:%d", pSql, pCmd->command); } // keep the code in local variable in order to avoid invalid read in case of async query int32_t code = pRes->code; if (code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, code); } else { tscQueueAsyncRes(pSql); } return code; }