未验证 提交 887b1ebf 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #12201 from taosdata/feature/qnode

fix: fix filter issue
......@@ -89,7 +89,7 @@ extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11
#undef TD_DEBUG_PRINT_ROW
#define TD_DEBUG_PRINT_ROW
#ifdef __cplusplus
}
......
......@@ -184,7 +184,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
......
......@@ -88,6 +88,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
......
......@@ -310,6 +310,7 @@ void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
#ifdef __cplusplus
}
......
......@@ -60,6 +60,7 @@ typedef struct SStmtBindInfo {
int32_t sBindRowNum;
int32_t sBindLastIdx;
int8_t tbType;
bool tagsCached;
void* boundTags;
char* tbName;
SName sname;
......
......@@ -405,7 +405,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_MND_INVALID_STB) {
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
......
......@@ -29,6 +29,11 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
/*
if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
*/
break;
case STMT_BIND_COL:
if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
......@@ -123,6 +128,7 @@ int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = tags;
pStmt->bInfo.tagsCached = false;
return TSDB_CODE_SUCCESS;
}
......@@ -207,8 +213,6 @@ int32_t stmtParseSql(STscStmt* pStmt) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS;
}
......@@ -219,8 +223,10 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
pStmt->bInfo.needParse = true;
taosMemoryFreeClear(pStmt->bInfo.tbName);
destroyBoundColumnInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
if (!pStmt->bInfo.tagsCached) {
destroyBoundColumnInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
}
return TSDB_CODE_SUCCESS;
}
......@@ -275,6 +281,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
qDestroyStmtDataBlock(pCache->pDataBlock);
destroyBoundColumnInfo(pCache->boundTags);
taosMemoryFreeClear(pCache->boundTags);
pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
}
......@@ -302,7 +309,15 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
int32_t code = catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS;
}
STMT_ERR_RET(code);
uint64_t uid = pTableMeta->uid;
uint64_t suid = pTableMeta->suid;
int8_t tableType = pTableMeta->tableType;
......@@ -328,6 +343,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.tbSuid = suid;
pStmt->bInfo.tbType = tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
pStmt->bInfo.tagsCached = true;
return TSDB_CODE_SUCCESS;
}
......@@ -340,6 +356,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.tbSuid = suid;
pStmt->bInfo.tbType = tableType;
pStmt->bInfo.boundTags = pCache->boundTags;
pStmt->bInfo.tagsCached = true;
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
......@@ -448,10 +465,12 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
if (!pStmt->bInfo.needParse) {
return TSDB_CODE_SUCCESS;
}
STMT_ERR_RET(stmtParseSql(pStmt));
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
......@@ -501,8 +520,6 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fiel
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
......@@ -520,6 +537,8 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (NULL == pStmt->sql.pQueryPlan) {
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
......@@ -586,6 +605,16 @@ int stmtExec(TAOS_STMT *stmt) {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
}
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
if (code) {
pStmt->exec.pRequest->code = code;
} else {
STMT_ERR_RET(stmtResetStmt(pStmt));
STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
}
}
STMT_ERR_JRET(pStmt->exec.pRequest->code);
......@@ -613,13 +642,11 @@ int stmtClose(TAOS_STMT *stmt) {
const char *stmtErrstr(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (stmt == NULL) {
if (stmt == NULL || NULL == pStmt->exec.pRequest) {
return (char*) tstrerror(terrno);
}
if (pStmt->exec.pRequest) {
pStmt->exec.pRequest->code = terrno;
}
pStmt->exec.pRequest->code = terrno;
return taos_errstr(pStmt->exec.pRequest);
}
......
......@@ -1233,21 +1233,21 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
for (int32_t i = 0; i < numOfCreatedDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db) + 1;
int32_t len = strlen(db);
taosHashPut(pRsp->createdDbs, db, len, db, len);
}
for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db) + 1;
int32_t len = strlen(db);
taosHashPut(pRsp->readDbs, db, len, db, len);
}
for (int32_t i = 0; i < numOfWriteDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db) + 1;
int32_t len = strlen(db);
taosHashPut(pRsp->writeDbs, db, len, db, len);
}
......
......@@ -367,8 +367,8 @@ TEST_F(MndTestUser, 03_Alter_User) {
EXPECT_EQ(numOfReadDbs, 1);
EXPECT_EQ(numOfWriteDbs, 0);
char* dbname = (char*)taosHashGet(authRsp.readDbs, "1.d2", 5);
EXPECT_STREQ(dbname, "1.d2");
char* dbname = (char*)taosHashGet(authRsp.readDbs, "1.d2", 4);
EXPECT_TRUE(dbname != NULL);
taosHashCleanup(authRsp.readDbs);
taosHashCleanup(authRsp.writeDbs);
......
......@@ -326,6 +326,11 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
#ifdef TD_DEBUG_PRINT_ROW
printf("!!! %s:%d table %" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, pTbData->uid,
SL_SIZE(pTbData->pData));
#endif
// Set statistics
keyMax = TD_ROW_KEY(blkIter.row);
......
......@@ -3879,7 +3879,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
if (metaGetTableEntryByUid(&mr, uid) < 0) {
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _error;
} else {
tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
......@@ -3949,7 +3949,7 @@ int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGr
metaReaderInit(&mr, (SMeta*)pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _error;
}
......
......@@ -497,6 +497,47 @@ _exit:
return 0;
}
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char* tags) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SMeta *pMeta = pVnode->pMeta;
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
STSchema *pSchema = NULL;
tb_uid_t suid = 0;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
if (blkIter.row == NULL) continue;
if (!pSchema || (suid != msgIter.suid)) {
if (pSchema) {
taosMemoryFreeClear(pSchema);
}
pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema
if(pSchema) {
suid = msgIter.suid;
}
}
if(!pSchema) {
printf("%s:%d no valid schema\n", tags, __LINE__);
continue;
}
char __tags[128] = {0};
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter.uid);
while ((row = tGetSubmitBlkNext(&blkIter))) {
tdSRowPrint(row, pSchema, __tags);
}
}
taosMemoryFreeClear(pSchema);
return 0;
}
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitMsgIter msgIter = {0};
......@@ -508,6 +549,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
pRsp->code = 0;
#ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif
// handle the request
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
......@@ -550,7 +595,8 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto _exit;
}
rsp.numOfRows += nRows;
rsp.affectedRows += nRows;
}
_exit:
......
......@@ -36,7 +36,7 @@ extern "C" {
#define CTG_DEFAULT_INVALID_VERSION (-1)
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_TDB_INVALID_TABLE_ID
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST
enum {
CTG_READ = 1,
......
......@@ -4199,7 +4199,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
}
taosMemoryFree(pOperator->pExpr);
taosMemoryFreeClear(pOperator->pExpr);
taosMemoryFreeClear(pOperator->info);
taosMemoryFreeClear(pOperator);
}
......@@ -4384,6 +4384,9 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
}
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
if (NULL == param) {
return;
}
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
......
......@@ -241,6 +241,15 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
SParseContext* pBasicCtx = pCxt->pComCxt;
SName name = {0};
createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg);
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
bool pass = false;
CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, dbFname, AUTH_TYPE_WRITE, &pass));
if (!pass) {
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
if (isStb) {
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
&pCxt->pTableMeta));
......@@ -1151,6 +1160,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
(*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
pCxt->pVgroupsHashObj = NULL;
pCxt->pTableBlockHashObj = NULL;
pCxt->pTableMeta = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -1276,7 +1286,7 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tN
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
SSchema* pSchema = pDataBlock->pTableMeta->schema;
SKvParam param = {.builder = &tagBuilder};
for (int c = 0; c < tags->numOfBound; ++c) {
......
......@@ -469,7 +469,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) + pOneTableBlock->createTbReqLen;
if (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
......@@ -601,6 +601,7 @@ int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
pBlock->numOfTables = 1;
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
pBlock->headerSize = pBlock->size;
pBlock->createTbReqLen = 0;
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
......
......@@ -2579,7 +2579,7 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
SName tableName;
int32_t code = getTableMetaImpl(
pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta);
if ((TSDB_CODE_TDB_INVALID_TABLE_ID == code || TSDB_CODE_VND_TB_NOT_EXIST == code) && pClause->ignoreNotExists) {
if ((TSDB_CODE_PAR_TABLE_NOT_EXIST == code || TSDB_CODE_VND_TB_NOT_EXIST == code) && pClause->ignoreNotExists) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -3150,7 +3150,7 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB;
}
strcpy(req.user, pStmt->userName);
strcpy(req.dbname, pStmt->dbName);
sprintf(req.dbname, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName);
return buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req);
}
......
......@@ -3577,6 +3577,22 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
}
if (QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
SNodeListNode *listNode = (SNodeListNode *)*pNode;
if (QUERY_NODE_VALUE != nodeType(listNode->pNodeList->pHead->pNode)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
SValueNode *valueNode = (SValueNode *)listNode->pNodeList->pHead->pNode;
uint8_t type = valueNode->node.resType.type;
SNode *node = NULL;
FOREACH(node, listNode->pNodeList) {
if (type != ((SValueNode *)node)->node.resType.type) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
}
return DEAL_RES_CONTINUE;
}
......
......@@ -265,13 +265,53 @@ static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam* pOut, int32_t r
}
static FORCE_INLINE void varToSigned(char *buf, SScalarParam* pOut, int32_t rowIndex) {
int64_t value = strtoll(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, &value);
switch (pOut->columnData->info.type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t value = (int8_t)strtoll(buf, NULL, 10);
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*)&value);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t value = (int16_t)strtoll(buf, NULL, 10);
colDataAppendInt16(pOut->columnData, rowIndex, (int16_t*)&value);
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t value = (int32_t)strtoll(buf, NULL, 10);
colDataAppendInt32(pOut->columnData, rowIndex, (int32_t*)&value);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t value = (int64_t)strtoll(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, (int64_t*)&value);
break;
}
}
}
static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam* pOut, int32_t rowIndex) {
uint64_t value = strtoull(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, (int64_t*) &value);
switch (pOut->columnData->info.type) {
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t value = (uint8_t)strtoull(buf, NULL, 10);
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*)&value);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t value = (uint16_t)strtoull(buf, NULL, 10);
colDataAppendInt16(pOut->columnData, rowIndex, (int16_t*)&value);
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t value = (uint32_t)strtoull(buf, NULL, 10);
colDataAppendInt32(pOut->columnData, rowIndex, (int32_t*)&value);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t value = (uint64_t)strtoull(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, (int64_t*)&value);
break;
}
}
}
static FORCE_INLINE void varToFloat(char *buf, SScalarParam* pOut, int32_t rowIndex) {
......@@ -453,6 +493,71 @@ void convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
}
}
int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int16_t inType, int16_t outType) {
SColumnInfoData* pInputCol = pIn->columnData;
SColumnInfoData* pOutputCol = pOut->columnData;
char tmp[128] = {0};
if (IS_SIGNED_NUMERIC_TYPE(inType) || inType == TSDB_DATA_TYPE_BOOL || inType == TSDB_DATA_TYPE_TIMESTAMP) {
for (int32_t i = 0; i < pIn->numOfRows; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i);
continue;
}
int64_t value = 0;
GET_TYPED_DATA(value, int64_t, inType, colDataGetData(pInputCol, i));
int32_t len = sprintf(varDataVal(tmp), "%" PRId64, value);
varDataLen(tmp) = len;
if (outType == TSDB_DATA_TYPE_NCHAR) {
varToNchar(tmp, pOut, i);
} else {
colDataAppend(pOutputCol, i, (char *)&value, false);
}
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(inType)) {
for (int32_t i = 0; i < pIn->numOfRows; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i);
continue;
}
uint64_t value = 0;
GET_TYPED_DATA(value, uint64_t, inType, colDataGetData(pInputCol, i));
int32_t len = sprintf(varDataVal(tmp), "%" PRIu64, value);
varDataLen(tmp) = len;
if (outType == TSDB_DATA_TYPE_NCHAR) {
varToNchar(tmp, pOut, i);
} else {
colDataAppend(pOutputCol, i, (char *)&value, false);
}
}
} else if (IS_FLOAT_TYPE(inType)) {
for (int32_t i = 0; i < pIn->numOfRows; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i);
continue;
}
double value = 0;
GET_TYPED_DATA(value, double, inType, colDataGetData(pInputCol, i));
int32_t len = sprintf(varDataVal(tmp), "%lf", value);
varDataLen(tmp) = len;
if (outType == TSDB_DATA_TYPE_NCHAR) {
varToNchar(tmp, pOut, i);
} else {
colDataAppend(pOutputCol, i, (char *)&value, false);
}
}
} else {
sclError("not supported input type:%d", inType);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
// TODO opt performance
int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
SColumnInfoData* pInputCol = pIn->columnData;
......@@ -610,6 +715,10 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
}
break;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
return vectorConvertToVarData(pIn, pOut, inType, outType);
}
default:
sclError("invalid convert output type:%d", outType);
return TSDB_CODE_QRY_APP_ERROR;
......
......@@ -94,6 +94,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册