diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 4c13f76e243dbc5c557443b7ff39abc43d52b922..a35863bb5a216617700bc2664d5111a2e46d5d17 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -106,11 +106,10 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void qDestroyBoundColInfo(void* pInfo); -void* smlInitHandle(SQuery* pQuery); -void smlDestroyHandle(void* pHandle); -int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, +SQuery* smlInitHandle(); +int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); -int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); +int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 719a6f82d24e6a1c48d5894d41481aefaa8afbc4..163e7044243195ee9d36c14e5229bd75f291d67b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -170,7 +170,6 @@ typedef struct { SHashObj *childTables; SHashObj *superTables; SHashObj *pVgHash; - void *exec; STscObj *taos; SCatalog *pCatalog; @@ -712,21 +711,21 @@ static bool smlParseBool(SSmlKv *kvVal) { const char *pVal = kvVal->value; int32_t len = kvVal->length; if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) { - kvVal->i = true; + kvVal->i = TSDB_TRUE; return true; } if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) { - kvVal->i = false; + kvVal->i = TSDB_FALSE; return true; } if ((len == 4) && !strncasecmp(pVal, "true", len)) { - kvVal->i = true; + kvVal->i = TSDB_TRUE; return true; } if ((len == 5) && !strncasecmp(pVal, "false", len)) { - kvVal->i = false; + kvVal->i = TSDB_FALSE; return true; } return false; @@ -1488,7 +1487,6 @@ static void smlDestroyCols(SArray *cols) { static void smlDestroyInfo(SSmlHandle *info) { if (!info) return; qDestroyQuery(info->pQuery); - smlDestroyHandle(info->exec); // destroy info->childTables void **p1 = (void **)taosHashIterate(info->childTables, NULL); @@ -1526,19 +1524,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr } info->id = smlGenId(); - info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); - if (NULL == info->pQuery) { - uError("SML:0x%" PRIx64 " create info->pQuery error", info->id); - goto cleanup; - } - info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - info->pQuery->haveResultSet = false; - info->pQuery->msgType = TDMT_VND_SUBMIT; - info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); - if (NULL == info->pQuery->pRoot) { - uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id); - goto cleanup; - } + info->pQuery = smlInitHandle(); if (pTscObj) { info->taos = pTscObj; @@ -1561,10 +1547,8 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr info->pRequest = request; info->msgBuf.buf = info->pRequest->msgBuf; info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; - info->pRequest->stmtType = info->pQuery->pRoot->type; } - info->exec = smlInitHandle(info->pQuery); info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -1577,7 +1561,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr goto cleanup; } } - if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash || + if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash || NULL == info->dumplicateKey) { uError("SML:0x%" PRIx64 " create info failed", info->id); goto cleanup; @@ -2337,7 +2321,7 @@ static int32_t smlInsertData(SSmlHandle *info) { (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid - code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, + code = smlBindData(info->pQuery, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl, info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { @@ -2347,7 +2331,7 @@ static int32_t smlInsertData(SSmlHandle *info) { oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable); } - code = smlBuildOutput(info->exec, info->pVgHash); + code = smlBuildOutput(info->pQuery, info->pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id); return code; diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index d64d1ce0c060286b041d4a7047af7e642afe0472..e4e8cd725b76530723b28199b50a677000175654 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -167,6 +167,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey); int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode); +int32_t initTableColSubmitData(STableDataCxt* pTableCxt); int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks); int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); @@ -174,5 +175,5 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); void insDestroyVgroupDataCxtList(SArray *pVgCxtList); void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash); void insDestroyTableDataCxt(STableDataCxt* pTableCxt); - +void destroyBoundColInfo(SBoundColInfo* pInfo); #endif // TDENGINE_PAR_INSERT_UTIL_H diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index e76ca7751dd6829b4b7c78635559ea5eeb847f69..8abb31b890ad4a562977e5e22844e6a545671ecc 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -45,95 +45,46 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* return TSDB_CODE_SUCCESS; } -typedef struct SmlExecTableHandle { - SParsedDataColInfo tags; // each table - SVCreateTbReq createTblReq; // each table -} SmlExecTableHandle; - -typedef struct SmlExecHandle { - SHashObj* pBlockHash; - SmlExecTableHandle tableExecHandle; - SQuery* pQuery; -} SSmlExecHandle; - -static void smlDestroyTableHandle(void* pHandle) { - SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle; - destroyBoundColumnInfo(&handle->tags); - tdDestroySVCreateTbReq(&handle->createTblReq); -} - -static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) { - col_id_t nCols = pColList->numOfCols; - - pColList->numOfBound = 0; - pColList->boundNullLen = 0; - memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols); - for (col_id_t i = 0; i < nCols; ++i) { - pColList->cols[i].valStat = VAL_STAT_NONE; +static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) { + bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); + if (NULL == pUseCols) { + return TSDB_CODE_OUT_OF_MEMORY; } - bool isOrdered = true; - col_id_t lastColIdx = -1; // last column found + pBoundInfo->numOfBound = 0; + int16_t lastColIdx = -1; // last column found + int32_t code = TSDB_CODE_SUCCESS; + for (int i = 0; i < taosArrayGetSize(cols); ++i) { SSmlKv* kv = taosArrayGetP(cols, i); SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; col_id_t t = lastColIdx + 1; - col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, nCols, pSchema)); - uDebug("SML, index:%d, t:%d, ncols:%d", index, t, nCols); + col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); + uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); if (index < 0 && t > 0) { index = insFindCol(&sToken, 0, t, pSchema); - isOrdered = false; } + if (index < 0) { uError("smlBoundColumnData. index:%d", index); - return TSDB_CODE_SML_INVALID_DATA; + code = TSDB_CODE_SML_INVALID_DATA; + goto end; } - if (pColList->cols[index].valStat == VAL_STAT_HAS) { + if (pUseCols[index]) { uError("smlBoundColumnData. already set. index:%d", index); - return TSDB_CODE_SML_INVALID_DATA; + code = TSDB_CODE_SML_INVALID_DATA; + goto end; } lastColIdx = index; - pColList->cols[index].valStat = VAL_STAT_HAS; - pColList->boundColumns[pColList->numOfBound] = index; - ++pColList->numOfBound; - switch (pSchema[t].type) { - case TSDB_DATA_TYPE_BINARY: - pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES); - break; - case TSDB_DATA_TYPE_NCHAR: - pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); - break; - default: - pColList->boundNullLen += TYPE_BYTES[pSchema[t].type]; - break; - } + pUseCols[index] = true; + pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; + ++pBoundInfo->numOfBound; } - pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; - - if (!isOrdered) { - pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo)); - if (NULL == pColList->colIdxInfo) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - SBoundIdxInfo* pColIdx = pColList->colIdxInfo; - for (col_id_t i = 0; i < pColList->numOfBound; ++i) { - pColIdx[i].schemaColIdx = pColList->boundColumns[i]; - pColIdx[i].boundIdx = i; - } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar); - for (col_id_t i = 0; i < pColList->numOfBound; ++i) { - pColIdx[i].finalIdx = i; - } - taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar); - } - - if (pColList->numOfCols > pColList->numOfBound) { - memset(&pColList->boundColumns[pColList->numOfBound], 0, - sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound)); - } +end: + taosMemoryFree(pUseCols); - return TSDB_CODE_SUCCESS; + return code; } /** @@ -146,7 +97,7 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS * @param msg * @return int32_t */ -static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName, +static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName, SMsgBuf* msg) { SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); if (!pTagArray) { @@ -159,7 +110,7 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < tags->numOfBound; ++i) { - SSchema* pTagSchema = &pSchema[tags->boundColumns[i]]; + SSchema* pTagSchema = &pSchema[tags->pColIndex[i]]; SSmlKv* kv = taosArrayGetP(cols, i); taosArrayPush(*tagName, pTagSchema->name); @@ -207,153 +158,165 @@ end: return code; } -int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, +int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; - smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table SSchema* pTagsSchema = getTableTagSchema(pTableMeta); - insSetBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta)); - int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true); + SBoundColInfo bindTags = {0}; + SVCreateTbReq *pCreateTblReq = NULL; + SArray* tagName = NULL; + + insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags); + int ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true); if (ret != TSDB_CODE_SUCCESS) { buildInvalidOperationMsg(&pBuf, "bound tags error"); - return ret; + goto end; } + STag* pTag = NULL; - SArray* tagName = NULL; - ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &tagName, &pBuf); + + ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf); if (ret != TSDB_CODE_SUCCESS) { - taosArrayDestroy(tagName); - return ret; + goto end; } - insBuildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, + pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (NULL == pCreateTblReq) { + ret = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags, ttl); - taosArrayDestroy(tagName); - smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1); - memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen); - smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0; + pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1); + memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen); - STableDataBlocks* pDataBlock = NULL; - ret = insGetDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), - TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, - pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq); + STableDataCxt* pTableCxt = NULL; + ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), + pTableMeta, &pCreateTblReq, &pTableCxt, false); if (ret != TSDB_CODE_SUCCESS) { - buildInvalidOperationMsg(&pBuf, "create data block error"); - return ret; + buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error"); + goto end; } SSchema* pSchema = getTableColumnSchema(pTableMeta); - - ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false); + ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false); if (ret != TSDB_CODE_SUCCESS) { buildInvalidOperationMsg(&pBuf, "bound cols error"); - return ret; + goto end; } - int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); + ret = initTableColSubmitData(pTableCxt); + if (ret != TSDB_CODE_SUCCESS) { + buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error"); + goto end; + } int32_t rowNum = taosArrayGetSize(cols); if (rowNum <= 0) { - return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); - } - ret = insAllocateMemForSize(pDataBlock, extendedRowSize * rowNum); - if (ret != TSDB_CODE_SUCCESS) { - buildInvalidOperationMsg(&pBuf, "allocate memory error"); - return ret; + ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0"); + goto end; } + for (int32_t r = 0; r < rowNum; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header - tdSRowResetBuf(pBuilder, row); void* rowData = taosArrayGetP(cols, r); - size_t rowDataSize = 0; - if (format) { - rowDataSize = taosArrayGetSize(rowData); - } // 1. set the parsed value from sql string - for (int c = 0, j = 0; c < spd->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[spd->boundColumns[c]]; - - param.schema = pColSchema; - insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - + for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) { + SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]]; + SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]); SSmlKv* kv = NULL; - if (format) { - if (j < rowDataSize) { - kv = taosArrayGetP(rowData, j); - if (rowDataSize != spd->numOfBound && j != 0 && - (kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) { - kv = NULL; - } else { - j++; - } - } - } else { + if (!format){ void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); if (p) kv = *p; } - if (kv) { - int32_t colLen = kv->length; - if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { - uDebug("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); - kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); - uDebug("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision); + if (kv == NULL) { + continue; + } + if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { + kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); + } + if (kv->type == TSDB_DATA_TYPE_NCHAR){ + int32_t len = 0; + char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE); + if (NULL == pUcs4) { + ret = TSDB_CODE_OUT_OF_MEMORY; + goto end; } - - if (IS_VAR_DATA_TYPE(kv->type)) { - insMemRowAppend(&pBuf, kv->value, colLen, ¶m); - } else { - insMemRowAppend(&pBuf, &(kv->value), colLen, ¶m); + if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { + if (errno == E2BIG) { + buildInvalidOperationMsg(&pBuf, "value too long"); + ret = TSDB_CODE_PAR_VALUE_TOO_LONG; + goto end; + } + ret = buildInvalidOperationMsg(&pBuf, strerror(errno)); + goto end; } + pVal->value.pData = pUcs4; + pVal->value.nData = len; + } else if(kv->type == TSDB_DATA_TYPE_BINARY) { + pVal->value.nData = kv->length; + pVal->value.pData = (uint8_t *)kv->value; } else { - pBuilder->hasNone = true; - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - insCheckTimestamp(pDataBlock, (const char*)&tsKey); + memcpy(&pVal->value.val, &(kv->value), kv->length); } + pVal->flag = CV_FLAG_VALUE; } - // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - pBuilder->hasNone = true; + SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); + ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); + if (TSDB_CODE_SUCCESS != ret) { + buildInvalidOperationMsg(&pBuf, "tRowBuild error"); + goto end; } - - tdSRowEnd(pBuilder); - pDataBlock->size += extendedRowSize; + insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); } - SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); - return insSetBlockInfo(pBlocks, pDataBlock, rowNum, &pBuf); +end: + destroyBoundColInfo(&bindTags); + taosMemoryFree(pCreateTblReq); + taosArrayDestroy(tagName); + return ret; } -void* smlInitHandle(SQuery* pQuery) { - SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle)); - if (!handle) return NULL; - handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - handle->pQuery = pQuery; - - return handle; -} +SQuery* smlInitHandle() { + SQuery *pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create pQuery error"); + return NULL; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + SVnodeModifOpStmt *stmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == stmt) { + uError("create SVnodeModifOpStmt error"); + qDestroyQuery(pQuery); + return NULL; + } + stmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + stmt->freeHashFunc = insDestroyTableDataCxtHashMap; + stmt->freeArrayFunc = insDestroyVgroupDataCxtList; -void smlDestroyHandle(void* pHandle) { - if (!pHandle) return; - SSmlExecHandle* handle = (SSmlExecHandle*)pHandle; - insDestroyBlockHashmap(handle->pBlockHash); - smlDestroyTableHandle(&handle->tableExecHandle); - taosMemoryFree(handle); + pQuery->pRoot = (SNode *)stmt; + return pQuery; } -int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) { - SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; - return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash); +int32_t smlBuildOutput(SQuery * handle, SHashObj* pVgHash) { + SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)(handle)->pRoot; + // merge according to vgId + int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); + if (code != TSDB_CODE_SUCCESS) { + uError("insMergeTableDataCxt failed"); + return code; + } + code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + if (code != TSDB_CODE_SUCCESS) { + uError("insBuildVgDataBlocks failed"); + return code; + } + return code; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 2b01ad4a37f2a72324ad13358b2f945ef1ce3d95..a0a975b225991246de6564a417d5390a47f42b68 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1371,8 +1371,6 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod return code; } -static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } - static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { destroyBoundColInfo(&pCxt->tags); taosMemoryFreeClear(pStmt->pTableMeta); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index bd74789d9a09720f2b5a17fb42467932a6c26ad5..517cc24ff69800d549df3d55a719030a9d613406 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -22,13 +22,6 @@ #include "ttime.h" #include "ttypes.h" -typedef struct SKvParam { - int16_t pos; - SArray* pTagVals; - SSchema* schema; - char buf[TSDB_MAX_TAGS_LEN]; -} SKvParam; - int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 66e5550eb7119cfe2c7418525e1ccd4b1f6c7f0f..c589526e0c0edf2c012d814d678a24a7618da3f4 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -340,8 +340,8 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) { void** p1 = taosHashIterate(pDataBlockHash, NULL); while (p1) { - STableDataBlocks* pBlocks = *p1; - insDestroyDataBlock(pBlocks); + SBoundColInfo* pBlocks = *p1; + destroyBoundColInfo(pBlocks); p1 = taosHashIterate(pDataBlockHash, p1); } @@ -1083,8 +1083,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) { - *pTableCxt = taosHashGet(pHash, id, idLen); - if (NULL != *pTableCxt) { + STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen); + if (NULL != tmp) { + *pTableCxt = *tmp; return TSDB_CODE_SUCCESS; } int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);