提交 7feeea97 编写于 作者: X Xiaoyu Wang

feat: add scenarios that trigger metadata refresh

上级 464a79ac
......@@ -182,8 +182,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \
(_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
......@@ -194,7 +196,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define REQUEST_MAX_TRY_TIMES 5
#define REQUEST_MAX_TRY_TIMES 1
#define qFatal(...) \
do { \
......
......@@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "command.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
......@@ -189,7 +189,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
}
}
if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
TSWAP(pRequest->dbList, (*pQuery)->pDbList);
TSWAP(pRequest->tableList, (*pQuery)->pTableList);
}
......@@ -293,7 +294,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
......@@ -483,7 +484,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t retryNum = 0;
int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
do {
destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
......@@ -494,9 +496,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest->code = code;
break;
}
destroyRequest(pRequest);
}
} while (retryNum++ < REQUEST_MAX_TRY_TIMES);
return pRequest;
}
......@@ -805,21 +805,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
static char* parseTagDatatoJson(void *p){
char* string = NULL;
cJSON *json = cJSON_CreateObject();
if (json == NULL)
{
static char* parseTagDatatoJson(void* p) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
goto end;
}
int16_t nCols = kvRowNCols(p);
char tagJsonKey[256] = {0};
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
SColIdx * pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0){
if(*val == TSDB_DATA_TYPE_NULL){
SColIdx* pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8);
sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(string, strlen(varDataVal(string)));
......@@ -834,19 +833,18 @@ static char* parseTagDatatoJson(void *p){
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
if(type == TSDB_DATA_TYPE_NULL) {
char type = *val;
if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL)
{
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_NCHAR) {
} else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0){
char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (varDataLen(realData) > 0) {
char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
taosMemoryFree(tagJsonValue);
......@@ -854,45 +852,41 @@ static char* parseTagDatatoJson(void *p){
}
value = cJSON_CreateString(tagJsonValue);
taosMemoryFree(tagJsonValue);
if (value == NULL)
{
if (value == NULL) {
goto end;
}
}else if(varDataLen(realData) == 0){
} else if (varDataLen(realData) == 0) {
value = cJSON_CreateString("");
}else{
} else {
ASSERT(0);
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_DOUBLE){
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL)
{
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
}else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL)
{
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else{
} else {
ASSERT(0);
}
}
string = cJSON_PrintUnformatted(json);
end:
......@@ -930,7 +924,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
}else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
} else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -943,7 +937,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
......@@ -951,7 +944,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
char *jsonString = parseTagDatatoJson(jsonInnerData);
char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
......
......@@ -41,6 +41,13 @@
sToken = tStrGetToken(pSql, &index, false); \
} while (0)
#define NEXT_VALID_TOKEN(pSql, sToken) \
do { \
sToken.n = tGetToken(pSql, &sToken.type); \
sToken.z = pSql; \
pSql += sToken.n; \
} while (TK_NK_SPACE == sToken.type)
typedef struct SInsertParseContext {
SParseContext* pComCxt; // input
char* pSql; // input
......@@ -482,9 +489,11 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_NK_INTEGER) {
return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
} else if (pToken->type == TK_NK_FLOAT) {
return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
......@@ -685,7 +694,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
isOrdered = false;
}
if (index < 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
......@@ -895,8 +904,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_COMMA == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
} else if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
}
......@@ -996,8 +1007,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
pDataBlock->size += extendedRowSize; // len;
}
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_COMMA == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
} else if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
}
......@@ -1057,10 +1070,10 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
STableMeta *pMeta = NULL;
int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
STableMeta* pMeta = NULL;
// for each table
while (1) {
......@@ -1121,7 +1134,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
&dataBuf, NULL, &pCxt->createTblReq));
pMeta = pCxt->pTableMeta;
pCxt->pTableMeta = NULL;
if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...)
CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta)));
......@@ -1160,7 +1173,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL;
......@@ -1231,14 +1245,14 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context);
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context);
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
while (NULL != pTable) {
taosArrayPush((*pQuery)->pTableList, pTable);
......@@ -1579,9 +1593,9 @@ typedef struct SmlExecTableHandle {
} SmlExecTableHandle;
typedef struct SmlExecHandle {
SHashObj* pBlockHash;
SmlExecTableHandle tableExecHandle;
SQuery *pQuery;
SHashObj* pBlockHash;
SmlExecTableHandle tableExecHandle;
SQuery* pQuery;
} SSmlExecHandle;
static void smlDestroyTableHandle(void* pHandle) {
......@@ -1673,9 +1687,9 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1
param.schema = pTagSchema;
SSmlKv* kv = taosArrayGetP(cols, i);
if(IS_VAR_DATA_TYPE(kv->type)){
if (IS_VAR_DATA_TYPE(kv->type)) {
KvRowAppend(msg, kv->value, kv->length, &param);
}else{
} else {
KvRowAppend(msg, &(kv->value), kv->length, &param);
}
}
......@@ -1688,13 +1702,13 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
return TSDB_CODE_SUCCESS;
}
int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols, bool format,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, char* msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -1702,7 +1716,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
return ret;
}
SKVRow row = NULL;
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema, &row, &pBuf);
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema,
&row, &pBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -1733,7 +1748,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
int32_t rowNum = taosArrayGetSize(cols);
if(rowNum <= 0) {
if (rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
}
ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
......@@ -1744,9 +1759,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
void *rowData = taosArrayGetP(cols, r);
void* rowData = taosArrayGetP(cols, r);
size_t rowDataSize = 0;
if(format){
if (format) {
rowDataSize = taosArrayGetSize(rowData);
}
......@@ -1781,9 +1796,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
}
if(IS_VAR_DATA_TYPE(kv->type)){
if (IS_VAR_DATA_TYPE(kv->type)) {
MemRowAppend(&pBuf, kv->value, colLen, &param);
}else{
} else {
MemRowAppend(&pBuf, &(kv->value), colLen, &param);
}
}
......
......@@ -120,6 +120,20 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const
return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta);
}
static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
SName name;
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
int32_t code =
catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false);
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName,
pTableName);
}
return code;
}
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
......@@ -3201,7 +3215,7 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) {
}
static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) {
return getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta);
return refreshGetTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta);
}
static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册