未验证 提交 8dad69b4 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #12766 from taosdata/feature/3.0_wxy

feat: add scenarios that trigger metadata refresh
...@@ -182,8 +182,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -182,8 +182,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \ #define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \
(_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \ #define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
...@@ -194,7 +196,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -194,7 +196,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_RETRY_ERROR(_code) \ #define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define REQUEST_MAX_TRY_TIMES 5 #define REQUEST_MAX_TRY_TIMES 1
#define qFatal(...) \ #define qFatal(...) \
do { \ do { \
......
...@@ -13,18 +13,18 @@ ...@@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cJSON.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "command.h" #include "command.h"
#include "scheduler.h" #include "scheduler.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdataformat.h"
#include "tdef.h" #include "tdef.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
...@@ -189,7 +189,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC ...@@ -189,7 +189,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision); setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
} }
}
if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
TSWAP(pRequest->dbList, (*pQuery)->pDbList); TSWAP(pRequest->dbList, (*pQuery)->pDbList);
TSWAP(pRequest->tableList, (*pQuery)->pTableList); TSWAP(pRequest->tableList, (*pQuery)->pTableList);
} }
...@@ -293,7 +294,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -293,7 +294,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res); pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) { if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob); schedulerFreeJob(pRequest->body.queryJob);
...@@ -483,7 +484,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -483,7 +484,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t retryNum = 0; int32_t retryNum = 0;
int32_t code = 0; int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) { do {
destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen); pRequest = launchQuery(pTscObj, sql, sqlLen);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break; break;
...@@ -494,9 +496,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -494,9 +496,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest->code = code; pRequest->code = code;
break; break;
} }
} while (retryNum++ < REQUEST_MAX_TRY_TIMES);
destroyRequest(pRequest);
}
return pRequest; return pRequest;
} }
...@@ -805,21 +805,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { ...@@ -805,21 +805,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static char* parseTagDatatoJson(void *p){ static char* parseTagDatatoJson(void* p) {
char* string = NULL; char* string = NULL;
cJSON *json = cJSON_CreateObject(); cJSON* json = cJSON_CreateObject();
if (json == NULL) if (json == NULL) {
{
goto end; goto end;
} }
int16_t nCols = kvRowNCols(p); int16_t nCols = kvRowNCols(p);
char tagJsonKey[256] = {0}; char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) { for (int j = 0; j < nCols; ++j) {
SColIdx * pColIdx = kvRowColIdxAt(p, j); SColIdx* pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx)); char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0){ if (j == 0) {
if(*val == TSDB_DATA_TYPE_NULL){ if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8); string = taosMemoryCalloc(1, 8);
sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(string, strlen(varDataVal(string))); varDataSetLen(string, strlen(varDataVal(string)));
...@@ -834,19 +833,18 @@ static char* parseTagDatatoJson(void *p){ ...@@ -834,19 +833,18 @@ static char* parseTagDatatoJson(void *p){
// json value // json value
val += varDataTLen(val); val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES); char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val; char type = *val;
if(type == TSDB_DATA_TYPE_NULL) { if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull(); cJSON* value = cJSON_CreateNull();
if (value == NULL) if (value == NULL) {
{
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL; cJSON* value = NULL;
if (varDataLen(realData) > 0){ if (varDataLen(realData) > 0) {
char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1); char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue); int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length < 0) { if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val); tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
taosMemoryFree(tagJsonValue); taosMemoryFree(tagJsonValue);
...@@ -854,45 +852,41 @@ static char* parseTagDatatoJson(void *p){ ...@@ -854,45 +852,41 @@ static char* parseTagDatatoJson(void *p){
} }
value = cJSON_CreateString(tagJsonValue); value = cJSON_CreateString(tagJsonValue);
taosMemoryFree(tagJsonValue); taosMemoryFree(tagJsonValue);
if (value == NULL) if (value == NULL) {
{
goto end; goto end;
} }
}else if(varDataLen(realData) == 0){ } else if (varDataLen(realData) == 0) {
value = cJSON_CreateString(""); value = cJSON_CreateString("");
}else{ } else {
ASSERT(0); ASSERT(0);
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_DOUBLE){ } else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData); double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd); cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) if (value == NULL) {
{
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){ // }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData); // int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd); // cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL) // if (value == NULL)
// { // {
// goto end; // goto end;
// } // }
// cJSON_AddItemToObject(json, tagJsonKey, value); // cJSON_AddItemToObject(json, tagJsonKey, value);
}else if (type == TSDB_DATA_TYPE_BOOL) { } else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData); char jsonVd = *(char*)(realData);
cJSON* value = cJSON_CreateBool(jsonVd); cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) if (value == NULL) {
{
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
}else{ } else {
ASSERT(0); ASSERT(0);
} }
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
end: end:
...@@ -930,7 +924,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -930,7 +924,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i]; pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData; pResultInfo->row[i] = pResultInfo->pCol[i].pData;
}else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) { } else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]); char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -943,7 +937,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -943,7 +937,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) { if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData; char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart; int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES; char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0}; char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
...@@ -951,7 +944,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -951,7 +944,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) { } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
char *jsonString = parseTagDatatoJson(jsonInnerData); char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString); STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString); taosMemoryFree(jsonString);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
......
...@@ -41,6 +41,13 @@ ...@@ -41,6 +41,13 @@
sToken = tStrGetToken(pSql, &index, false); \ sToken = tStrGetToken(pSql, &index, false); \
} while (0) } while (0)
#define NEXT_VALID_TOKEN(pSql, sToken) \
do { \
sToken.n = tGetToken(pSql, &sToken.type); \
sToken.z = pSql; \
pSql += sToken.n; \
} while (TK_NK_SPACE == sToken.type)
typedef struct SInsertParseContext { typedef struct SInsertParseContext {
SParseContext* pComCxt; // input SParseContext* pComCxt; // input
char* pSql; // input char* pSql; // input
...@@ -482,9 +489,11 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int ...@@ -482,9 +489,11 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
} }
} else if (pToken->type == TK_NK_INTEGER) { } else if (pToken->type == TK_NK_INTEGER) {
return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
} else if (pToken->type == TK_NK_FLOAT) { } else if (pToken->type == TK_NK_FLOAT) {
return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
} else { } else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
} }
...@@ -685,7 +694,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* ...@@ -685,7 +694,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
isOrdered = false; isOrdered = false;
} }
if (index < 0) { if (index < 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z); return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
} }
if (pColList->cols[index].valStat == VAL_STAT_HAS) { if (pColList->cols[index].valStat == VAL_STAT_HAS) {
return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
...@@ -895,8 +904,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb ...@@ -895,8 +904,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
} }
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname)); CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) { if (TK_NK_COMMA == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
} else if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
} }
...@@ -996,8 +1007,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo ...@@ -996,8 +1007,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
pDataBlock->size += extendedRowSize; // len; pDataBlock->size += extendedRowSize; // len;
} }
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) { if (TK_NK_COMMA == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
} else if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
} }
...@@ -1057,10 +1070,10 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { ...@@ -1057,10 +1070,10 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...]; // [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) { static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0; int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false; bool autoCreateTbl = false;
STableMeta *pMeta = NULL; STableMeta* pMeta = NULL;
// for each table // for each table
while (1) { while (1) {
...@@ -1121,7 +1134,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -1121,7 +1134,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
&dataBuf, NULL, &pCxt->createTblReq)); &dataBuf, NULL, &pCxt->createTblReq));
pMeta = pCxt->pTableMeta; pMeta = pCxt->pTableMeta;
pCxt->pTableMeta = NULL; pCxt->pTableMeta = NULL;
if (TK_NK_LP == sToken.type) { if (TK_NK_LP == sToken.type) {
// pSql -> field1_name, ...) // pSql -> field1_name, ...)
CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta))); CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta)));
...@@ -1160,7 +1173,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -1160,7 +1173,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags)); memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL; pCxt->pVgroupsHashObj = NULL;
...@@ -1231,14 +1245,14 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { ...@@ -1231,14 +1245,14 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
context.pOutput->payloadType = PAYLOAD_TYPE_KV; context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context); int32_t code = skipInsertInto(&context);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context); code = parseInsertBody(&context);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL); SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
while (NULL != pTable) { while (NULL != pTable) {
taosArrayPush((*pQuery)->pTableList, pTable); taosArrayPush((*pQuery)->pTableList, pTable);
...@@ -1579,9 +1593,9 @@ typedef struct SmlExecTableHandle { ...@@ -1579,9 +1593,9 @@ typedef struct SmlExecTableHandle {
} SmlExecTableHandle; } SmlExecTableHandle;
typedef struct SmlExecHandle { typedef struct SmlExecHandle {
SHashObj* pBlockHash; SHashObj* pBlockHash;
SmlExecTableHandle tableExecHandle; SmlExecTableHandle tableExecHandle;
SQuery *pQuery; SQuery* pQuery;
} SSmlExecHandle; } SSmlExecHandle;
static void smlDestroyTableHandle(void* pHandle) { static void smlDestroyTableHandle(void* pHandle) {
...@@ -1673,9 +1687,9 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD ...@@ -1673,9 +1687,9 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1 SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1
param.schema = pTagSchema; param.schema = pTagSchema;
SSmlKv* kv = taosArrayGetP(cols, i); SSmlKv* kv = taosArrayGetP(cols, i);
if(IS_VAR_DATA_TYPE(kv->type)){ if (IS_VAR_DATA_TYPE(kv->type)) {
KvRowAppend(msg, kv->value, kv->length, &param); KvRowAppend(msg, kv->value, kv->length, &param);
}else{ } else {
KvRowAppend(msg, &(kv->value), kv->length, &param); KvRowAppend(msg, &(kv->value), kv->length, &param);
} }
} }
...@@ -1688,13 +1702,13 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD ...@@ -1688,13 +1702,13 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols, bool format, int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) { char* tableName, char* msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
SSchema* pTagsSchema = getTableTagSchema(pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta)); setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema); int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -1702,7 +1716,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols ...@@ -1702,7 +1716,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
return ret; return ret;
} }
SKVRow row = NULL; SKVRow row = NULL;
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema, &row, &pBuf); ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema,
&row, &pBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -1733,7 +1748,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols ...@@ -1733,7 +1748,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
int32_t rowNum = taosArrayGetSize(cols); int32_t rowNum = taosArrayGetSize(cols);
if(rowNum <= 0) { if (rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
} }
ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum); ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
...@@ -1744,9 +1759,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols ...@@ -1744,9 +1759,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
for (int32_t r = 0; r < rowNum; ++r) { for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row); tdSRowResetBuf(pBuilder, row);
void *rowData = taosArrayGetP(cols, r); void* rowData = taosArrayGetP(cols, r);
size_t rowDataSize = 0; size_t rowDataSize = 0;
if(format){ if (format) {
rowDataSize = taosArrayGetSize(rowData); rowDataSize = taosArrayGetSize(rowData);
} }
...@@ -1781,9 +1796,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols ...@@ -1781,9 +1796,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
} }
if(IS_VAR_DATA_TYPE(kv->type)){ if (IS_VAR_DATA_TYPE(kv->type)) {
MemRowAppend(&pBuf, kv->value, colLen, &param); MemRowAppend(&pBuf, kv->value, colLen, &param);
}else{ } else {
MemRowAppend(&pBuf, &(kv->value), colLen, &param); MemRowAppend(&pBuf, &(kv->value), colLen, &param);
} }
} }
......
...@@ -120,6 +120,20 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const ...@@ -120,6 +120,20 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const
return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta); return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta);
} }
static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
SName name;
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
int32_t code =
catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false);
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName,
pTableName);
}
return code;
}
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) { static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt; SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs); int32_t code = collectUseDatabase(pName, pCxt->pDbs);
...@@ -3201,7 +3215,7 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) { ...@@ -3201,7 +3215,7 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) {
} }
static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) { static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) {
return getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta); return refreshGetTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta);
} }
static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) { static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册