提交 d2761b41 编写于 作者: H Haojun Liao

ehn(query): refactor c driver codes.

上级 2b2c2ad5
......@@ -43,13 +43,23 @@ extern "C" {
} \
} while (0)
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
// todo refactor
enum {
CONN_TYPE__QUERY = 1,
CONN_TYPE__TMQ,
};
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
......@@ -172,33 +182,15 @@ typedef struct SReqResultInfo {
int32_t payloadLen;
} SReqResultInfo;
typedef struct SShowReqInfo {
int64_t execId; // showId/queryId
int32_t vgId;
SArray* pArray; // SArray<SVgroupInfo>
int32_t currentIndex; // current accessed vgroup index.
} SShowReqInfo;
typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG.
struct SQueryPlan* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef struct {
int8_t resType;
char* topic;
......@@ -217,7 +209,6 @@ typedef struct SRequestObj {
int32_t sqlLen;
int64_t self;
char* msgBuf;
void* pInfo; // sql parse info, generated by parser module
int32_t code;
SArray* dbList;
SArray* tableList;
......@@ -252,7 +243,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int taos_init();
int taos_init();
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
......@@ -266,7 +257,6 @@ char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
void resetConnectDB(STscObj* pTscObj);
void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char* str);
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
......@@ -286,8 +276,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
......
......@@ -33,7 +33,7 @@ SAppInfo appInfo;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj *pRequest) {
......@@ -188,7 +188,6 @@ static void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pInfo);
taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo);
......@@ -198,10 +197,6 @@ static void doDestroyRequest(void *p) {
schedulerFreeJob(pRequest->body.queryJob);
}
if (pRequest->body.showInfo.pArray != NULL) {
taosArrayDestroy(pRequest->body.showInfo.pArray);
}
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "clientInt.h"
#include "clientLog.h"
......
......@@ -193,7 +193,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
} else {
// assert to avoid uninitialization error
// assert to avoid un-initialization error
ASSERT(0);
}
return NULL;
......@@ -355,6 +355,7 @@ int taos_result_precision(TAOS_RES *res) {
if (res == NULL) {
return TSDB_TIME_PRECISION_MILLI;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
return pRequest->body.resInfo.precision;
......@@ -467,6 +468,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (res == NULL) {
return 0;
}
if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
if (pResultInfo == NULL) {
......
......@@ -117,10 +117,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
struct SCatalog *pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) {
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code));
tstrerror(code1));
} else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
}
......@@ -154,10 +154,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
} else {
struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code));
tstrerror(code1));
} else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
}
......@@ -209,84 +209,9 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
void initMsgHandleFp() {
#if 0
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;
tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
}
......@@ -6815,6 +6815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
}
SOperatorInfo* pOptr = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
......@@ -6822,7 +6823,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
return createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
......@@ -6836,9 +6837,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
return createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL);
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL);
} else {
return createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -6856,39 +6857,39 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
return createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
return createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
return createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
return createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else {
ASSERT(0);
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
......@@ -6901,7 +6902,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
return NULL;
taosMemoryFree(ops);
return pOptr;
}
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo,
......
......@@ -1020,7 +1020,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (ctx->phase == QW_PHASE_PRE_QUERY) {
ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle;
ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
needRsp = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册