/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "parInsertUtil.h" #include "parToken.h" #include "tglobal.h" #include "ttime.h" #define NEXT_TOKEN_WITH_PREV(pSql, token) \ do { \ int32_t index = 0; \ token = tStrGetToken(pSql, &index, true); \ pSql += index; \ } while (0) #define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \ do { \ token = tStrGetToken(pSql, &index, false); \ } while (0) #define NEXT_VALID_TOKEN(pSql, token) \ do { \ (token).n = tGetToken(pSql, &(token).type); \ (token).z = (char*)pSql; \ pSql += (token).n; \ } while (TK_NK_SPACE == (token).type) typedef struct SInsertParseContext { SParseContext* pComCxt; SMsgBuf msg; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; SParsedDataColInfo tags; // for stmt bool missCache; bool usingDuplicateTable; bool forceUpdate; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; static bool isNullStr(SToken* pToken) { return ((pToken->type == TK_NK_STRING) && (strlen(TSDB_DATA_NULL_STR_L) == pToken->n) && (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)); } static bool isNullValue(int8_t dataType, SToken* pToken) { return TK_NULL == pToken->type || (!IS_STR_DATA_TYPE(dataType) && isNullStr(pToken)); } static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) { errno = 0; *value = taosStr2Double(pToken->z, endPtr); // not a valid integer number, return error if ((*endPtr - pToken->z) != pToken->n) { return TK_NK_ILLEGAL; } return pToken->type; } static int32_t skipInsertInto(const char** pSql, SMsgBuf* pMsg) { SToken token; NEXT_TOKEN(*pSql, token); if (TK_INSERT != token.type && TK_IMPORT != token.type) { return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", token.z); } NEXT_TOKEN(*pSql, token); if (TK_INTO != token.type) { return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", token.z); } return TSDB_CODE_SUCCESS; } static int32_t skipParentheses(SInsertParseContext* pCxt, const char** pSql) { SToken token; int32_t expectRightParenthesis = 1; while (1) { NEXT_TOKEN(*pSql, token); if (TK_NK_LP == token.type) { ++expectRightParenthesis; } else if (TK_NK_RP == token.type && 0 == --expectRightParenthesis) { break; } if (0 == token.n) { return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL); } } return TSDB_CODE_SUCCESS; } static int32_t skipTableOptions(SInsertParseContext* pCxt, const char** pSql) { do { int32_t index = 0; SToken token; NEXT_TOKEN_KEEP_SQL(*pSql, token, index); if (TK_TTL == token.type || TK_COMMENT == token.type) { *pSql += index; NEXT_TOKEN_WITH_PREV(*pSql, token); } else { break; } } while (1); return TSDB_CODE_SUCCESS; } // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) { int32_t code = TSDB_CODE_SUCCESS; SToken token; NEXT_TOKEN(*pSql, token); NEXT_TOKEN(*pSql, token); if (TK_NK_LP == token.type) { code = skipParentheses(pCxt, pSql); if (TSDB_CODE_SUCCESS == code) { NEXT_TOKEN(*pSql, token); } } // pSql -> TAGS (tag1_value, ...) if (TSDB_CODE_SUCCESS == code) { if (TK_TAGS != token.type) { code = buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z); } else { NEXT_TOKEN(*pSql, token); } } if (TSDB_CODE_SUCCESS == code) { if (TK_NK_LP != token.type) { code = buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z); } else { code = skipParentheses(pCxt, pSql); } } if (TSDB_CODE_SUCCESS == code) { code = skipTableOptions(pCxt, pSql); } return code; } static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pDuplicate) { *pDuplicate = false; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName)); if (NULL != pMeta) { *pDuplicate = true; int32_t code = ignoreUsingClause(pCxt, &pStmt->pSql); if (TSDB_CODE_SUCCESS == code) { return cloneTableMeta(*pMeta, &pStmt->pTableMeta); } } return TSDB_CODE_SUCCESS; } // pStmt->pSql -> field1_name, ...) static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, SParsedDataColInfo* pColList, SSchema* pSchema) { col_id_t nCols = pColList->numOfCols; pColList->numOfBound = 0; pColList->boundNullLen = 0; memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols); for (col_id_t i = 0; i < nCols; ++i) { pColList->cols[i].valStat = VAL_STAT_NONE; } SToken token; bool isOrdered = true; col_id_t lastColIdx = -1; // last column found while (1) { NEXT_TOKEN(*pSql, token); if (TK_NK_RP == token.type) { break; } char tmpTokenBuf[TSDB_COL_NAME_LEN + 2] = {0}; // used for deleting Escape character backstick(`) strncpy(tmpTokenBuf, token.z, token.n); token.z = tmpTokenBuf; token.n = strdequote(token.z); col_id_t t = lastColIdx + 1; col_id_t index = insFindCol(&token, t, nCols, pSchema); if (index < 0 && t > 0) { index = insFindCol(&token, 0, t, pSchema); isOrdered = false; } if (index < 0) { return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z); } if (pColList->cols[index].valStat == VAL_STAT_HAS) { return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z); } lastColIdx = index; pColList->cols[index].valStat = VAL_STAT_HAS; pColList->boundColumns[pColList->numOfBound] = index; ++pColList->numOfBound; switch (pSchema[t].type) { case TSDB_DATA_TYPE_BINARY: pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES); break; case TSDB_DATA_TYPE_NCHAR: pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); break; default: pColList->boundNullLen += TYPE_BYTES[pSchema[t].type]; break; } } if (!isTags && pColList->cols[0].valStat == VAL_STAT_NONE) { return buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null"); } pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; if (!isOrdered) { pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo)); if (NULL == pColList->colIdxInfo) { return TSDB_CODE_OUT_OF_MEMORY; } SBoundIdxInfo* pColIdx = pColList->colIdxInfo; for (col_id_t i = 0; i < pColList->numOfBound; ++i) { pColIdx[i].schemaColIdx = pColList->boundColumns[i]; pColIdx[i].boundIdx = i; } taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar); for (col_id_t i = 0; i < pColList->numOfBound; ++i) { pColIdx[i].finalIdx = i; } taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar); } if (pColList->numOfCols > pColList->numOfBound) { memset(&pColList->boundColumns[pColList->numOfBound], 0, sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound)); } return TSDB_CODE_SUCCESS; } static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) { int32_t index = 0; int64_t interval; int64_t ts = 0; const char* pTokenEnd = *end; if (pToken->type == TK_NOW) { ts = taosGetTimestamp(timePrec); } else if (pToken->type == TK_TODAY) { ts = taosGetTimestampToday(timePrec); } else if (pToken->type == TK_NK_INTEGER) { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &ts)) { return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); } } else { // parse the RFC-3339/ISO-8601 timestamp format string if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) { return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); } return TSDB_CODE_SUCCESS; } for (int k = pToken->n; pToken->z[k] != '\0'; k++) { if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue; if (pToken->z[k] == '(' && pToken->z[k + 1] == ')') { // for insert NOW()/TODAY() *end = pTokenEnd = &pToken->z[k + 2]; k++; continue; } if (pToken->z[k] == ',') { *end = pTokenEnd; *time = ts; return 0; } break; } /* * time expression: * e.g., now+12a, now-5h */ index = 0; SToken token = tStrGetToken(pTokenEnd, &index, false); pTokenEnd += index; if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) { index = 0; SToken valueToken = tStrGetToken(pTokenEnd, &index, false); pTokenEnd += index; if (valueToken.n < 2) { return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", token.z); } char unit = 0; if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; } if (token.type == TK_NK_PLUS) { ts += interval; } else { ts = ts - interval; } *end = pTokenEnd; } *time = ts; return TSDB_CODE_SUCCESS; } static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val, SMsgBuf* pMsgBuf) { int64_t iv; uint64_t uv; char* endptr = NULL; if (isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z); } return TSDB_CODE_SUCCESS; } // strcpy(val->colName, pSchema->name); val->cid = pSchema->colId; val->type = pSchema->type; switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) { if (strncmp(pToken->z, "true", pToken->n) == 0) { *(int8_t*)(&val->i64) = TRUE_VALUE; } else if (strncmp(pToken->z, "false", pToken->n) == 0) { *(int8_t*)(&val->i64) = FALSE_VALUE; } else { return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); } } else if (pToken->type == TK_NK_INTEGER) { *(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE); } else if (pToken->type == TK_NK_FLOAT) { *(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE); } else { return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); } break; } case TSDB_DATA_TYPE_TINYINT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z); } else if (!IS_VALID_TINYINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z); } *(int8_t*)(&val->i64) = iv; break; } case TSDB_DATA_TYPE_UTINYINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z); } else if (uv > UINT8_MAX) { return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z); } *(uint8_t*)(&val->i64) = uv; break; } case TSDB_DATA_TYPE_SMALLINT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z); } else if (!IS_VALID_SMALLINT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z); } *(int16_t*)(&val->i64) = iv; break; } case TSDB_DATA_TYPE_USMALLINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z); } else if (uv > UINT16_MAX) { return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z); } *(uint16_t*)(&val->i64) = uv; break; } case TSDB_DATA_TYPE_INT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z); } else if (!IS_VALID_INT(iv)) { return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z); } *(int32_t*)(&val->i64) = iv; break; } case TSDB_DATA_TYPE_UINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z); } else if (uv > UINT32_MAX) { return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z); } *(uint32_t*)(&val->i64) = uv; break; } case TSDB_DATA_TYPE_BIGINT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z); } val->i64 = iv; break; } case TSDB_DATA_TYPE_UBIGINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z); } *(uint64_t*)(&val->i64) = uv; break; } case TSDB_DATA_TYPE_FLOAT: { double dv; if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z); } if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z); } *(float*)(&val->i64) = dv; break; } case TSDB_DATA_TYPE_DOUBLE: { double dv; if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z); } if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z); } *(double*)(&val->i64) = dv; break; } case TSDB_DATA_TYPE_BINARY: { // Too long values will raise the invalid sql error message if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); } val->pData = strdup(pToken->z); val->nData = pToken->n; break; } case TSDB_DATA_TYPE_NCHAR: { int32_t output = 0; void* p = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { if (errno == E2BIG) { taosMemoryFree(p); return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); } char buf[512] = {0}; snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno)); taosMemoryFree(p); return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z); } val->pData = p; val->nData = output; break; } case TSDB_DATA_TYPE_TIMESTAMP: { if (parseTime(end, pToken, timePrec, &iv, pMsgBuf) != TSDB_CODE_SUCCESS) { return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z); } val->i64 = iv; break; } } return TSDB_CODE_SUCCESS; } // input pStmt->pSql: [(tag1_name, ...)] TAGS (tag1_value, ...) ... // output pStmt->pSql: TAGS (tag1_value, ...) ... static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SSchema* pTagsSchema = getTableTagSchema(pStmt->pTableMeta); insSetBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pStmt->pTableMeta)); SToken token; int32_t index = 0; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); if (TK_NK_LP != token.type) { return TSDB_CODE_SUCCESS; } pStmt->pSql += index; return parseBoundColumns(pCxt, &pStmt->pSql, true, &pCxt->tags, pTagsSchema); } static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken, SArray* pTagName, SArray* pTagVals, STag** pTag) { if (!isNullValue(pTagSchema->type, pToken)) { taosArrayPush(pTagName, pTagSchema->name); } if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z); } if (isNullValue(pTagSchema->type, pToken)) { return tTagNew(pTagVals, 1, true, pTag); } else { return parseJsontoTagData(pToken->z, pTagVals, pTag, &pCxt->msg); } } STagVal val = {0}; int32_t code = parseTagToken(&pStmt->pSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { taosArrayPush(pTagVals, &val); } return code; } static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) { insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid, pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); } static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) || (pToken->n == 0) || (pToken->type == TK_NK_RP)) { return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); } // Remove quotation marks if (TK_NK_STRING == pToken->type) { if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z); } int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW); pToken->z = tmpTokenBuf; pToken->n = len; } return TSDB_CODE_SUCCESS; } // pSql -> tag1_value, ...) static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { int32_t code = TSDB_CODE_SUCCESS; SSchema* pSchema = getTableTagSchema(pStmt->pTableMeta); SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal)); SArray* pTagName = taosArrayInit(8, TSDB_COL_NAME_LEN); SToken token; bool isParseBindParam = false; bool isJson = false; STag* pTag = NULL; for (int i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->tags.numOfBound; ++i) { NEXT_TOKEN_WITH_PREV(pStmt->pSql, token); if (token.type == TK_NK_QUESTION) { isParseBindParam = true; if (NULL == pCxt->pComCxt->pStmtCb) { code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", token.z); break; } continue; } if (isParseBindParam) { code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values"); break; } SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON; code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { code = parseTagValue(pCxt, pStmt, pTagSchema, &token, pTagName, pTagVals, &pTag); } } if (TSDB_CODE_SUCCESS == code && !isParseBindParam && !isJson) { code = tTagNew(pTagVals, 1, false, &pTag); } if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { buildCreateTbReq(pStmt, pTag, pTagName); pTag = NULL; } for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); if (IS_VAR_DATA_TYPE(p->type)) { taosMemoryFreeClear(p->pData); } } taosArrayDestroy(pTagVals); taosArrayDestroy(pTagName); tTagFree(pTag); return code; } // input pStmt->pSql: TAGS (tag1_value, ...) [table_options] ... // output pStmt->pSql: [table_options] ... static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SToken token; NEXT_TOKEN(pStmt->pSql, token); if (TK_TAGS != token.type) { return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z); } NEXT_TOKEN(pStmt->pSql, token); if (TK_NK_LP != token.type) { return buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z); } int32_t code = parseTagsClauseImpl(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { NEXT_VALID_TOKEN(pStmt->pSql, token); if (TK_NK_COMMA == token.type) { code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED); } else if (TK_NK_RP != token.type) { code = buildSyntaxErrMsg(&pCxt->msg, ") is expected", token.z); } } return code; } static int32_t storeTableMeta(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { pStmt->pTableMeta->suid = pStmt->pTableMeta->uid; pStmt->pTableMeta->uid = pStmt->totalTbNum; pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE; STableMeta* pBackup = NULL; if (TSDB_CODE_SUCCESS != cloneTableMeta(pStmt->pTableMeta, &pBackup)) { return TSDB_CODE_OUT_OF_MEMORY; } char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); return taosHashPut(pStmt->pSubTableHashObj, tbFName, strlen(tbFName), &pBackup, POINTER_BYTES); } static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { do { int32_t index = 0; SToken token; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); if (TK_TTL == token.type) { pStmt->pSql += index; NEXT_TOKEN_WITH_PREV(pStmt->pSql, token); if (TK_NK_INTEGER != token.type) { return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z); } pStmt->createTblReq.ttl = taosStr2Int32(token.z, NULL, 10); if (pStmt->createTblReq.ttl < 0) { return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z); } } else if (TK_COMMENT == token.type) { pStmt->pSql += index; NEXT_TOKEN(pStmt->pSql, token); if (TK_NK_STRING != token.type) { return buildSyntaxErrMsg(&pCxt->msg, "Invalid option comment", token.z); } if (token.n >= TSDB_TB_COMMENT_LEN) { return buildSyntaxErrMsg(&pCxt->msg, "comment too long", token.z); } int32_t len = trimString(token.z, token.n, pCxt->tmpTokenBuf, TSDB_TB_COMMENT_LEN); pStmt->createTblReq.comment = strndup(pCxt->tmpTokenBuf, len); if (NULL == pStmt->createTblReq.comment) { return TSDB_CODE_OUT_OF_MEMORY; } pStmt->createTblReq.commentLen = len; } else { break; } } while (1); return TSDB_CODE_SUCCESS; } // input pStmt->pSql: // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... // output pStmt->pSql: // 1. [(field1_name, ...)] // 2. VALUES ... | FILE ... static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (!pStmt->usingTableProcessing || pCxt->usingDuplicateTable) { return TSDB_CODE_SUCCESS; } int32_t code = parseBoundTagsClause(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { code = parseTagsClause(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code) { code = parseTableOptions(pCxt, pStmt); } return code; } static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache) { char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTbName, dbFName); int32_t code = TSDB_CODE_SUCCESS; bool pass = true; bool exists = true; if (pCxt->async) { code = catalogChkAuthFromCache(pCxt->pCatalog, pCxt->pUser, dbFName, AUTH_TYPE_WRITE, &pass, &exists); } else { SRequestConnInfo conn = {.pTrans = pCxt->pTransporter, .requestId = pCxt->requestId, .requestObjRefId = pCxt->requestRid, .mgmtEps = pCxt->mgmtEpSet}; code = catalogChkAuth(pCxt->pCatalog, &conn, pCxt->pUser, dbFName, AUTH_TYPE_WRITE, &pass); } if (TSDB_CODE_SUCCESS == code) { if (!exists) { *pMissCache = true; } else if (!pass) { code = TSDB_CODE_PAR_PERMISSION_DENIED; } } return code; } static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta, bool* pMissCache) { SParseContext* pComCxt = pCxt->pComCxt; int32_t code = TSDB_CODE_SUCCESS; if (pComCxt->async) { if (isStb) { code = catalogGetCachedSTableMeta(pComCxt->pCatalog, pTbName, pTableMeta); } else { code = catalogGetCachedTableMeta(pComCxt->pCatalog, pTbName, pTableMeta); } } else { SRequestConnInfo conn = {.pTrans = pComCxt->pTransporter, .requestId = pComCxt->requestId, .requestObjRefId = pComCxt->requestRid, .mgmtEps = pComCxt->mgmtEpSet}; if (isStb) { code = catalogGetSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta); } else { code = catalogGetTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta); } } if (TSDB_CODE_SUCCESS == code) { if (NULL == *pTableMeta) { *pMissCache = true; } else if (isStb && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) { code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } else if (!isStb && TSDB_SUPER_TABLE == (*pTableMeta)->tableType) { code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); } } return code; } static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool isStb, bool* pMissCache) { int32_t code = TSDB_CODE_SUCCESS; SVgroupInfo vg; bool exists = true; if (pCxt->async) { code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists); } else { SRequestConnInfo conn = {.pTrans = pCxt->pTransporter, .requestId = pCxt->requestId, .requestObjRefId = pCxt->requestRid, .mgmtEps = pCxt->mgmtEpSet}; code = catalogGetTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg); } if (TSDB_CODE_SUCCESS == code) { if (exists) { if (isStb) { pStmt->pTableMeta->vgId = vg.vgId; } code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } *pMissCache = !exists; } return code; } static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (pCxt->forceUpdate) { pCxt->missCache = true; return TSDB_CODE_SUCCESS; } int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); } return code; } static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName) { return insCreateSName(&pStmt->usingTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); } static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (pCxt->forceUpdate) { pCxt->missCache = true; return TSDB_CODE_SUCCESS; } int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); } return code; } static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SToken token; NEXT_TOKEN(pStmt->pSql, token); int32_t code = preParseUsingTableName(pCxt, pStmt, &token); if (TSDB_CODE_SUCCESS == code) { code = getUsingTableSchema(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = storeTableMeta(pCxt, pStmt); } return code; } // input pStmt->pSql: // 1(care). [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ... // 2. VALUES ... | FILE ... // output pStmt->pSql: // 1. [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ... // 2. VALUES ... | FILE ... static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SToken token; int32_t index = 0; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); if (TK_USING != token.type) { return getTargetTableSchema(pCxt, pStmt); } pStmt->usingTableProcessing = true; // pStmt->pSql -> stb_name [(tag1_name, ...) pStmt->pSql += index; int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable); if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) { return parseUsingTableNameImpl(pCxt, pStmt); } return code; } static int32_t preParseTargetTableName(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName) { return insCreateSName(&pStmt->targetTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); } // input pStmt->pSql: // 1(care). [(field1_name, ...)] ... // 2. [ USING ... ] ... // 3. VALUES ... | FILE ... // output pStmt->pSql: // 1. [ USING ... ] ... // 2. VALUES ... | FILE ... static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SToken token; int32_t index = 0; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); if (TK_NK_LP != token.type) { return TSDB_CODE_SUCCESS; } // pStmt->pSql -> field1_name, ...) pStmt->pSql += index; pStmt->pBoundCols = pStmt->pSql; return skipParentheses(pCxt, &pStmt->pSql); } static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks** pDataBuf) { if (pCxt->pComCxt->async) { uint64_t uid = pStmt->pTableMeta->uid; if (pStmt->usingTableProcessing) { pStmt->pTableMeta->uid = 0; } return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq); } char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pStmt->targetTableName, tbFName); return insGetDataBlockFromList(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq); } static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) { SToken token; int32_t index = 0; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); if (TK_NK_LP == token.type) { pStmt->pSql += index; if (NULL != pStmt->pBoundCols) { return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } // pStmt->pSql -> field1_name, ...) return parseBoundColumns(pCxt, &pStmt->pSql, false, &pDataBuf->boundColumnInfo, getTableColumnSchema(pStmt->pTableMeta)); } if (NULL != pStmt->pBoundCols) { return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, &pDataBuf->boundColumnInfo, getTableColumnSchema(pStmt->pTableMeta)); } return TSDB_CODE_SUCCESS; } // input pStmt->pSql: // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... // output pStmt->pSql: VALUES ... | FILE ... static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks** pDataBuf) { int32_t code = parseUsingClauseBottom(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { code = getTableDataBlocks(pCxt, pStmt, pDataBuf); } if (TSDB_CODE_SUCCESS == code) { code = parseBoundColumnsClause(pCxt, pStmt, *pDataBuf); } return code; } // input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ... // output pStmt->pSql: // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... static int32_t parseSchemaClauseTop(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName) { int32_t code = preParseTargetTableName(pCxt, pStmt, pTbName); if (TSDB_CODE_SUCCESS == code) { // option: [(field1_name, ...)] code = preParseBoundColumnsClause(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code) { // option: [USING stb_name] code = parseUsingTableName(pCxt, pStmt); } return code; } static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema, int16_t timePrec, _row_append_fn_t func, void* param) { int64_t iv; uint64_t uv; char* endptr = NULL; switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) { if (strncmp(pToken->z, "true", pToken->n) == 0) { return func(&pCxt->msg, &TRUE_VALUE, pSchema->bytes, param); } else if (strncmp(pToken->z, "false", pToken->n) == 0) { return func(&pCxt->msg, &FALSE_VALUE, pSchema->bytes, param); } else { return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); } } else if (pToken->type == TK_NK_INTEGER) { return func(&pCxt->msg, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); } else if (pToken->type == TK_NK_FLOAT) { return func(&pCxt->msg, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); } else { return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); } } case TSDB_DATA_TYPE_TINYINT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); } else if (!IS_VALID_TINYINT(iv)) { return buildSyntaxErrMsg(&pCxt->msg, "tinyint data overflow", pToken->z); } uint8_t tmpVal = (uint8_t)iv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_UTINYINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z); } else if (uv > UINT8_MAX) { return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z); } uint8_t tmpVal = (uint8_t)uv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_SMALLINT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z); } else if (!IS_VALID_SMALLINT(iv)) { return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z); } int16_t tmpVal = (int16_t)iv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_USMALLINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z); } else if (uv > UINT16_MAX) { return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z); } uint16_t tmpVal = (uint16_t)uv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_INT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z); } else if (!IS_VALID_INT(iv)) { return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z); } int32_t tmpVal = (int32_t)iv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_UINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z); } else if (uv > UINT32_MAX) { return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z); } uint32_t tmpVal = (uint32_t)uv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_BIGINT: { if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z); } return func(&pCxt->msg, &iv, pSchema->bytes, param); } case TSDB_DATA_TYPE_UBIGINT: { if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z); } return func(&pCxt->msg, &uv, pSchema->bytes, param); } case TSDB_DATA_TYPE_FLOAT: { double dv; if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); } if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z); } float tmpVal = (float)dv; return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } case TSDB_DATA_TYPE_DOUBLE: { double dv; if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); } if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z); } return func(&pCxt->msg, &dv, pSchema->bytes, param); } case TSDB_DATA_TYPE_BINARY: { // Too long values will raise the invalid sql error message if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); } return func(&pCxt->msg, pToken->z, pToken->n, param); } case TSDB_DATA_TYPE_NCHAR: { return func(&pCxt->msg, pToken->z, pToken->n, param); } case TSDB_DATA_TYPE_JSON: { if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z); } return func(&pCxt->msg, pToken->z, pToken->n, param); } case TSDB_DATA_TYPE_TIMESTAMP: { int64_t tmpVal; if (parseTime(pSql, pToken, timePrec, &tmpVal, &pCxt->msg) != TSDB_CODE_SUCCESS) { return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z); } return func(&pCxt->msg, &tmpVal, pSchema->bytes, param); } } return TSDB_CODE_FAILED; } static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema, int16_t timePrec, _row_append_fn_t func, void* param) { int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z); } return func(&pCxt->msg, NULL, 0, param); } if (TSDB_CODE_SUCCESS == code && IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z); } if (TSDB_CODE_SUCCESS == code) { code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, timePrec, func, param); } return code; } static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataBlocks* pDataBuf, bool* pGotRow, SToken* pToken) { SRowBuilder* pBuilder = &pDataBuf->rowBuilder; STSRow* row = (STSRow*)(pDataBuf->pData + pDataBuf->size); // skip the SSubmitBlk header SParsedDataColInfo* pCols = &pDataBuf->boundColumnInfo; bool isParseBindParam = false; SSchema* pSchemas = getTableColumnSchema(pDataBuf->pTableMeta); SMemParam param = {.rb = pBuilder}; int32_t code = tdSRowResetBuf(pBuilder, row); // 1. set the parsed value from sql string for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) { NEXT_TOKEN_WITH_PREV(*pSql, *pToken); SSchema* pSchema = &pSchemas[pCols->boundColumns[i]]; if (pToken->type == TK_NK_QUESTION) { isParseBindParam = true; if (NULL == pCxt->pComCxt->pStmtCb) { code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); } continue; } if (TSDB_CODE_SUCCESS == code && TK_NK_RP == pToken->type) { code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); } if (TSDB_CODE_SUCCESS == code && isParseBindParam) { code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); } if (TSDB_CODE_SUCCESS == code) { param.schema = pSchema; insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, ¶m.toffset, ¶m.colIdx); code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pDataBuf->pTableMeta).precision, insMemRowAppend, ¶m); } if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) { NEXT_VALID_TOKEN(*pSql, *pToken); if (TK_NK_COMMA != pToken->type) { code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z); } } } if (TSDB_CODE_SUCCESS == code) { TSKEY tsKey = TD_ROW_KEY(row); code = insCheckTimestamp(pDataBuf, (const char*)&tsKey); } if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { // set the null value for the columns that do not assign values if ((pCols->numOfBound < pCols->numOfCols) && TD_IS_TP_ROW(row)) { pBuilder->hasNone = true; } tdSRowEnd(pBuilder); *pGotRow = true; #ifdef TD_DEBUG_PRINT_ROW STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1); tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); #endif } return code; } static int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) { size_t remain = pDataBlock->nAllocSize - pDataBlock->size; const int factor = 5; uint32_t nAllocSizeOld = pDataBlock->nAllocSize; // expand the allocated size if (remain < rowSize * factor) { while (remain < rowSize * factor) { pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5); remain = pDataBlock->nAllocSize - pDataBlock->size; } char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize); if (tmp != NULL) { pDataBlock->pData = tmp; memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size); } else { // do nothing, if allocate more memory failed pDataBlock->nAllocSize = nAllocSizeOld; *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; return TSDB_CODE_OUT_OF_MEMORY; } } *numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize; return TSDB_CODE_SUCCESS; } // pSql -> (field1_value, ...) [(field1_value2, ...) ...] static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, int32_t maxRows, int32_t* pNumOfRows, SToken* pToken) { int32_t code = insInitRowBuilder(&pDataBuf->rowBuilder, pDataBuf->pTableMeta->sversion, &pDataBuf->boundColumnInfo); int32_t extendedRowSize = insGetExtendedRowSize(pDataBuf); (*pNumOfRows) = 0; while (TSDB_CODE_SUCCESS == code) { int32_t index = 0; NEXT_TOKEN_KEEP_SQL(pStmt->pSql, *pToken, index); if (TK_NK_LP != pToken->type) { break; } pStmt->pSql += index; if ((*pNumOfRows) >= maxRows || pDataBuf->size + extendedRowSize >= pDataBuf->nAllocSize) { code = allocateMemIfNeed(pDataBuf, extendedRowSize, &maxRows); } bool gotRow = false; if (TSDB_CODE_SUCCESS == code) { code = parseOneRow(pCxt, &pStmt->pSql, pDataBuf, &gotRow, pToken); } if (TSDB_CODE_SUCCESS == code) { NEXT_VALID_TOKEN(pStmt->pSql, *pToken); if (TK_NK_COMMA == pToken->type) { code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); } else if (TK_NK_RP != pToken->type) { code = buildSyntaxErrMsg(&pCxt->msg, ") expected", pToken->z); } } if (TSDB_CODE_SUCCESS == code && gotRow) { pDataBuf->size += extendedRowSize; (*pNumOfRows)++; } } if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) { code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL); } return code; } // VALUES (field1_value, ...) [(field1_value2, ...) ...] static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, SToken* pToken) { int32_t maxNumOfRows = 0; int32_t numOfRows = 0; int32_t code = allocateMemIfNeed(pDataBuf, insGetExtendedRowSize(pDataBuf), &maxNumOfRows); if (TSDB_CODE_SUCCESS == code) { code = parseValues(pCxt, pStmt, pDataBuf, maxNumOfRows, &numOfRows, pToken); } if (TSDB_CODE_SUCCESS == code) { code = insSetBlockInfo((SSubmitBlk*)(pDataBuf->pData), pDataBuf, numOfRows, &pCxt->msg); } if (TSDB_CODE_SUCCESS == code) { pDataBuf->numOfTables = 1; pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_INSERT); } return code; } static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, int maxRows, int32_t* pNumOfRows) { int32_t code = insInitRowBuilder(&pDataBuf->rowBuilder, pDataBuf->pTableMeta->sversion, &pDataBuf->boundColumnInfo); int32_t extendedRowSize = insGetExtendedRowSize(pDataBuf); (*pNumOfRows) = 0; char* pLine = NULL; int64_t readLen = 0; pStmt->fileProcessing = false; while (TSDB_CODE_SUCCESS == code && (readLen = taosGetLineFile(pStmt->fp, &pLine)) != -1) { if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) { pLine[--readLen] = '\0'; } if (readLen == 0) { continue; } if ((*pNumOfRows) >= maxRows || pDataBuf->size + extendedRowSize >= pDataBuf->nAllocSize) { code = allocateMemIfNeed(pDataBuf, extendedRowSize, &maxRows); } bool gotRow = false; if (TSDB_CODE_SUCCESS == code) { SToken token; strtolower(pLine, pLine); const char* pRow = pLine; code = parseOneRow(pCxt, (const char**)&pRow, pDataBuf, &gotRow, &token); } if (TSDB_CODE_SUCCESS == code && gotRow) { pDataBuf->size += extendedRowSize; (*pNumOfRows)++; } if (TSDB_CODE_SUCCESS == code && pDataBuf->nAllocSize > tsMaxMemUsedByInsert * 1024 * 1024) { pStmt->fileProcessing = true; break; } } taosMemoryFree(pLine); if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) { code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL); } return code; } static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) { int32_t maxNumOfRows = 0; int32_t numOfRows = 0; int32_t code = allocateMemIfNeed(pDataBuf, insGetExtendedRowSize(pDataBuf), &maxNumOfRows); if (TSDB_CODE_SUCCESS == code) { code = parseCsvFile(pCxt, pStmt, pDataBuf, maxNumOfRows, &numOfRows); } if (TSDB_CODE_SUCCESS == code) { code = insSetBlockInfo((SSubmitBlk*)(pDataBuf->pData), pDataBuf, numOfRows, &pCxt->msg); } if (TSDB_CODE_SUCCESS == code) { pDataBuf->numOfTables = 1; pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT); if (!pStmt->fileProcessing) { taosCloseFile(&pStmt->fp); } else { parserDebug("0x%" PRIx64 " insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId); } } return code; } static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pFilePath, STableDataBlocks* pDataBuf) { char filePathStr[TSDB_FILENAME_LEN] = {0}; if (TK_NK_STRING == pFilePath->type) { trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr)); } else { strncpy(filePathStr, pFilePath->z, pFilePath->n); } pStmt->fp = taosOpenFile(filePathStr, TD_FILE_READ | TD_FILE_STREAM); if (NULL == pStmt->fp) { return TAOS_SYSTEM_ERROR(errno); } return parseDataFromFileImpl(pCxt, pStmt, pDataBuf); } static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf, SToken* pToken) { NEXT_TOKEN(pStmt->pSql, *pToken); if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z); } return parseDataFromFile(pCxt, pStmt, pToken, pDataBuf); } // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) { SToken token; NEXT_TOKEN(pStmt->pSql, token); switch (token.type) { case TK_VALUES: return parseValuesClause(pCxt, pStmt, pDataBuf, &token); case TK_FILE: return parseFileClause(pCxt, pStmt, pDataBuf, &token); default: break; } return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } // input pStmt->pSql: // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { STableDataBlocks* pDataBuf = NULL; int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pDataBuf); if (TSDB_CODE_SUCCESS == code) { code = parseDataClause(pCxt, pStmt, pDataBuf); } return code; } static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { destroyBoundColumnInfo(&pCxt->tags); taosMemoryFreeClear(pStmt->pTableMeta); tdDestroySVCreateTbReq(&pStmt->createTblReq); pCxt->missCache = false; pCxt->usingDuplicateTable = false; pStmt->pBoundCols = NULL; pStmt->usingTableProcessing = false; pStmt->fileProcessing = false; } // input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ... static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName) { resetEnvPreTable(pCxt, pStmt); int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = parseInsertTableClauseBottom(pCxt, pStmt); } return code; } static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pTbName, bool* pHasData) { // no data in the sql string anymore. if (0 == pTbName->n) { if (0 != pTbName->type && '\0' != pStmt->pSql[0]) { return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", pTbName->z); } if (0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) { return buildInvalidOperationMsg(&pCxt->msg, "no data in sql"); } *pHasData = false; return TSDB_CODE_SUCCESS; } if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && pStmt->totalTbNum > 0) { return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt"); } if (TK_NK_QUESTION == pTbName->type) { if (NULL == pCxt->pComCxt->pStmtCb) { return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z); } char* tbName = NULL; int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName); if (TSDB_CODE_SUCCESS == code) { pTbName->z = tbName; pTbName->n = strlen(tbName); } else { return code; } } *pHasData = true; return TSDB_CODE_SUCCESS; } static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags)); if (NULL == tags) { return TSDB_CODE_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pStmt->pVgroupsHashObj = NULL; pStmt->pTableBlockHashObj = NULL; return code; } static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { return setStmtInfo(pCxt, pStmt); } // merge according to vgId int32_t code = TSDB_CODE_SUCCESS; if (taosHashGetSize(pStmt->pTableBlockHashObj) > 0) { code = insMergeTableDataBlocks(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); } if (TSDB_CODE_SUCCESS == code) { code = insBuildOutput(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); } return code; } // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] // [(field1_name, ...)] // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { SToken token; int32_t code = TSDB_CODE_SUCCESS; bool hasData = true; // for each table while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) { // pStmt->pSql -> tb_name ... NEXT_TOKEN(pStmt->pSql, token); code = checkTableClauseFirstToken(pCxt, pStmt, &token, &hasData); if (TSDB_CODE_SUCCESS == code && hasData) { code = parseInsertTableClause(pCxt, pStmt, &token); } } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = parseInsertBodyBottom(pCxt, pStmt); } return code; } static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); } static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) { SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); if (NULL == pStmt) { return TSDB_CODE_OUT_OF_MEMORY; } if (pCxt->pComCxt->pStmtCb) { TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT); } pStmt->pSql = pCxt->pComCxt->pSql; pStmt->freeHashFunc = insDestroyBlockHashmap; pStmt->freeArrayFunc = insDestroyBlockArrayList; if (!reentry) { pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pStmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); } pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); if ((!reentry && (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj)) || NULL == pStmt->pSubTableHashObj || NULL == pStmt->pTableNameHashObj || NULL == pStmt->pDbFNameHashObj) { nodesDestroyNode((SNode*)pStmt); return TSDB_CODE_OUT_OF_MEMORY; } taosHashSetFreeFp(pStmt->pSubTableHashObj, destroySubTableHashElem); *pOutput = (SNode*)pStmt; return TSDB_CODE_SUCCESS; } static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) { SQuery* pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->haveResultSet = false; pQuery->msgType = TDMT_VND_SUBMIT; int32_t code = createVnodeModifOpStmt(pCxt, false, &pQuery->pRoot); if (TSDB_CODE_SUCCESS == code) { *pOutput = pQuery; } else { nodesDestroyNode((SNode*)pQuery); } return code; } static int32_t checkAuthFromMetaData(const SArray* pUsers) { if (1 != taosArrayGetSize(pUsers)) { return TSDB_CODE_FAILED; } SMetaRes* pRes = taosArrayGet(pUsers, 0); if (TSDB_CODE_SUCCESS == pRes->code) { return (*(bool*)pRes->pRes) ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED; } return pRes->code; } static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMeta) { if (1 != taosArrayGetSize(pTables)) { return TSDB_CODE_FAILED; } SMetaRes* pRes = taosArrayGet(pTables, 0); if (TSDB_CODE_SUCCESS == pRes->code) { *pMeta = tableMetaDup((const STableMeta*)pRes->pRes); if (NULL == *pMeta) { return TSDB_CODE_OUT_OF_MEMORY; } } return pRes->code; } static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifOpStmt* pStmt, bool isStb) { if (1 != taosArrayGetSize(pTables)) { return TSDB_CODE_FAILED; } SMetaRes* pRes = taosArrayGet(pTables, 0); if (TSDB_CODE_SUCCESS != pRes->code) { return pRes->code; } SVgroupInfo* pVg = pRes->pRes; if (isStb) { pStmt->pTableMeta->vgId = pVg->vgId; } return taosHashPut(pStmt->pVgroupsHashObj, (const char*)&pVg->vgId, sizeof(pVg->vgId), (char*)pVg, sizeof(SVgroupInfo)); } static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData, SVnodeModifOpStmt* pStmt, bool isStb) { int32_t code = checkAuthFromMetaData(pMetaData->pUser); if (TSDB_CODE_SUCCESS == code) { code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta); } if (TSDB_CODE_SUCCESS == code && !isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) { code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); } if (TSDB_CODE_SUCCESS == code) { code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb); } return code; } static void destoryTablesReq(void* p) { STablesReq* pRes = (STablesReq*)p; taosArrayDestroy(pRes->pTables); } static void clearCatalogReq(SCatalogReq* pCatalogReq) { if (NULL == pCatalogReq) { return; } taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); pCatalogReq->pTableMeta = NULL; taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); pCatalogReq->pTableHash = NULL; taosArrayDestroy(pCatalogReq->pUser); pCatalogReq->pUser = NULL; } static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SVnodeModifOpStmt* pStmt) { clearCatalogReq(pCatalogReq); if (pStmt->usingTableProcessing) { return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); } return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false); } static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery) { nodesDestroyNode(pQuery->pRoot); int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot); if (TSDB_CODE_SUCCESS == code) { SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; (*pCxt->pComCxt->pStmtCb->getExecInfoFn)(pCxt->pComCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj); if (NULL == pStmt->pVgroupsHashObj) { pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); } if (NULL == pStmt->pTableBlockHashObj) { pStmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj) { code = TSDB_CODE_OUT_OF_MEMORY; } } return code; } static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SQuery** pQuery) { if (NULL == *pQuery) { return createInsertQuery(pCxt, pQuery); } if (NULL != pCxt->pComCxt->pStmtCb) { return resetVnodeModifOpStmt(pCxt, *pQuery); } SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)(*pQuery)->pRoot; if (!pStmt->fileProcessing) { return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, pStmt); } return TSDB_CODE_SUCCESS; } static int32_t setRefreshMate(SQuery* pQuery) { SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot; SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL); while (NULL != pTable) { taosArrayPush(pQuery->pTableList, pTable); pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable); } char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL); while (NULL != pDb) { taosArrayPush(pQuery->pDbList, pDb); pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb); } return TSDB_CODE_SUCCESS; } // INSERT INTO // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] // [(field1_name, ...)] // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { int32_t code = skipInsertInto(&pStmt->pSql, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(pCxt, pStmt); } return code; } static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { STableDataBlocks* pDataBuf = NULL; int32_t code = getTableDataBlocks(pCxt, pStmt, &pDataBuf); if (TSDB_CODE_SUCCESS == code) { code = parseDataFromFileImpl(pCxt, pStmt, pDataBuf); } if (TSDB_CODE_SUCCESS == code) { if (pStmt->fileProcessing) { code = parseInsertBodyBottom(pCxt, pStmt); } else { code = parseInsertBody(pCxt, pStmt); } } return code; } static int32_t parseInsertSqlFromTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { int32_t code = parseInsertTableClauseBottom(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(pCxt, pStmt); } return code; } static int32_t parseInsertSqlImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { if (pStmt->pSql == pCxt->pComCxt->pSql || NULL != pCxt->pComCxt->pStmtCb) { return parseInsertSqlFromStart(pCxt, pStmt); } if (pStmt->fileProcessing) { return parseInsertSqlFromCsv(pCxt, pStmt); } return parseInsertSqlFromTable(pCxt, pStmt); } static int32_t buildInsertTableReq(SName* pName, SArray** pTables) { *pTables = taosArrayInit(1, sizeof(SName)); if (NULL == *pTables) { return TSDB_CODE_OUT_OF_MEMORY; } taosArrayPush(*pTables, pName); return TSDB_CODE_SUCCESS; } static int32_t buildInsertDbReq(SName* pName, SArray** pDbs) { if (NULL == *pDbs) { *pDbs = taosArrayInit(1, sizeof(STablesReq)); if (NULL == *pDbs) { return TSDB_CODE_OUT_OF_MEMORY; } } STablesReq req = {0}; tNameGetFullDbName(pName, req.dbFName); buildInsertTableReq(pName, &req.pTables); taosArrayPush(*pDbs, &req); return TSDB_CODE_SUCCESS; } static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray** pUserAuth) { *pUserAuth = taosArrayInit(1, sizeof(SUserAuthInfo)); if (NULL == *pUserAuth) { return TSDB_CODE_OUT_OF_MEMORY; } SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE}; snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser); tNameGetFullDbName(pName, userAuth.dbFName); taosArrayPush(*pUserAuth, &userAuth); return TSDB_CODE_SUCCESS; } static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SCatalogReq* pCatalogReq) { int32_t code = buildInsertUserAuthReq(pCxt->pComCxt->pUser, &pStmt->targetTableName, &pCatalogReq->pUser); if (TSDB_CODE_SUCCESS == code) { if (0 == pStmt->usingTableName.type) { code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta); } else { code = buildInsertDbReq(&pStmt->usingTableName, &pCatalogReq->pTableMeta); } } if (TSDB_CODE_SUCCESS == code) { code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableHash); } return code; } static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) { if (pCxt->missCache) { parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId, ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); pQuery->execStage = QUERY_EXEC_STAGE_PARSE; return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq); } parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId, ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE; return TSDB_CODE_SUCCESS; } int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) { SInsertParseContext context = { .pComCxt = pCxt, .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, .missCache = false, .usingDuplicateTable = false, .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false) }; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); if (TSDB_CODE_SUCCESS == code) { code = parseInsertSqlImpl(&context, (SVnodeModifOpStmt*)(*pQuery)->pRoot); } if (TSDB_CODE_SUCCESS == code) { code = setNextStageInfo(&context, *pQuery, pCatalogReq); } if ((TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) && QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) { code = setRefreshMate(*pQuery); } destroyBoundColumnInfo(&context.tags); return code; }