未验证 提交 7d700361 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9991 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -51,7 +51,7 @@ static bool validPassword(const char* passwd) { ...@@ -51,7 +51,7 @@ static bool validPassword(const char* passwd) {
} }
static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *auth, const char *db, static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *auth, const char *db,
uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, TAOS **taos) { void (*fp)(void *, TAOS_RES *, int), void *param, TAOS **taos) {
if (taos_init()) { if (taos_init()) {
return NULL; return NULL;
} }
...@@ -186,7 +186,7 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) { ...@@ -186,7 +186,7 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db,
uint16_t port) { uint16_t port) {
STscObj *pObj = NULL; STscObj *pObj = NULL;
SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, port, syncConnCallback, NULL, (void **)&pObj); SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, syncConnCallback, NULL, (void **)&pObj);
if (pSql != NULL) { if (pSql != NULL) {
pSql->fp = syncConnCallback; pSql->fp = syncConnCallback;
pSql->param = pSql; pSql->param = pSql;
...@@ -262,7 +262,7 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { ...@@ -262,7 +262,7 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, TAOS **taos) { void *param, TAOS **taos) {
STscObj *pObj = NULL; STscObj *pObj = NULL;
SSqlObj *pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, (void **)&pObj); SSqlObj *pSql = taosConnectImpl(ip, user, pass, NULL, db, asyncConnCallback, param, (void **)&pObj);
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
......
...@@ -77,10 +77,10 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str ...@@ -77,10 +77,10 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety. * This is a asynchronized API, and is also thread-safety.
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob); int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob);
/** /**
* Fetch query result from the remote query executor * Fetch query result from the remote query executor
......
...@@ -101,13 +101,13 @@ struct SAppInstInfo { ...@@ -101,13 +101,13 @@ struct SAppInstInfo {
}; };
typedef struct SAppInfo { typedef struct SAppInfo {
int64_t startTime; int64_t startTime;
char appName[TSDB_APP_NAME_LEN]; char appName[TSDB_APP_NAME_LEN];
char *ep; char *ep;
int32_t pid; int32_t pid;
int32_t numOfThreads; int32_t numOfThreads;
SHashObj *pInstMap;
SHashObj *pInstMap; pthread_mutex_t mutex;
} SAppInfo; } SAppInfo;
typedef struct STscObj { typedef struct STscObj {
...@@ -192,7 +192,7 @@ uint64_t generateRequestId(); ...@@ -192,7 +192,7 @@ uint64_t generateRequestId();
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
char *getConnectionDB(STscObj* pObj); char *getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db); void setConnectionDB(STscObj* pTscObj, const char* db);
void taos_init_imp(void); void taos_init_imp(void);
......
...@@ -253,10 +253,11 @@ void taos_init_imp(void) { ...@@ -253,10 +253,11 @@ void taos_init_imp(void) {
clientReqRefPool = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
taosGetAppName(appInfo.appName, NULL); taosGetAppName(appInfo.appName, NULL);
pthread_mutex_init(&appInfo.mutex, NULL);
appInfo.pid = taosGetPId(); appInfo.pid = taosGetPId();
appInfo.startTime = taosGetTimestampMs(); appInfo.startTime = taosGetTimestampMs();
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscDebug("client is initialized successfully"); tscDebug("client is initialized successfully");
} }
......
...@@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
return strdup(key); return strdup(key);
} }
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
...@@ -110,9 +110,11 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -110,9 +110,11 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
} }
char* key = getClusterKey(user, secretEncrypt, ip, port); char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo** pInst = NULL;
// TODO: race condition here. pthread_mutex_lock(&appInfo.mutex);
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL) { if (pInst == NULL) {
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
...@@ -123,8 +125,10 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -123,8 +125,10 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pInst = &p; pInst = &p;
} }
pthread_mutex_unlock(&appInfo.mutex);
tfree(key); tfree(key);
return taosConnectImpl(user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst); return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
} }
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) { int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
...@@ -155,7 +159,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { ...@@ -155,7 +159,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
SParseContext cxt = { SParseContext cxt = {
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.acctId = pTscObj->acctId, .acctId = pTscObj->acctId,
.db = getConnectionDB(pTscObj), .db = getDbOfConnection(pTscObj),
.pSql = pRequest->sqlstr, .pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen, .sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
...@@ -238,9 +242,6 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t ...@@ -238,9 +242,6 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
taosArrayDestroy(pNodeList);
int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// handle error and retry // handle error and retry
...@@ -645,6 +646,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -645,6 +646,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
SQueryNode *pQueryNode = NULL; SQueryNode *pQueryNode = NULL;
SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
...@@ -653,7 +655,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -653,7 +655,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if (qIsDdlQuery(pQueryNode)) { if (qIsDdlQuery(pQueryNode)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return); CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
} else { } else {
SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return); CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
...@@ -661,6 +662,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -661,6 +662,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
} }
_return: _return:
taosArrayDestroy(pNodeList);
qDestroyQuery(pQueryNode); qDestroyQuery(pQueryNode);
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
pRequest->code = terrno; pRequest->code = terrno;
...@@ -705,7 +707,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe ...@@ -705,7 +707,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
return 0; return 0;
} }
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) { STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo); STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
if (NULL == pTscObj) { if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -763,7 +765,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { ...@@ -763,7 +765,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
STscObj *pObj = pRequest->pTscObj; STscObj *pObj = pRequest->pTscObj;
char* db = getConnectionDB(pObj); char* db = getDbOfConnection(pObj);
if (db != NULL) { if (db != NULL) {
tstrncpy(pConnect->db, db, sizeof(pConnect->db)); tstrncpy(pConnect->db, db, sizeof(pConnect->db));
} }
...@@ -995,7 +997,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t ...@@ -995,7 +997,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
} }
} }
char* getConnectionDB(STscObj* pObj) { char* getDbOfConnection(STscObj* pObj) {
char *p = NULL; char *p = NULL;
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
size_t len = strlen(pObj->db); size_t len = strlen(pObj->db);
......
此差异已折叠。
...@@ -91,16 +91,18 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, ...@@ -91,16 +91,18 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, // extract the final result schema
int32_t* numOfCols) { // extract the final result schema void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols) {
SArray* pTopSubplan = taosArrayGetP((*pDag)->pSubplans, 0); SArray* pTopSubplan = taosArrayGetP((*pDag)->pSubplans, 0);
SSubplan* pPlan = taosArrayGetP(pTopSubplan, 0); SSubplan* pPlan = taosArrayGetP(pTopSubplan, 0);
SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema); SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
*numOfCols = pDataBlockSchema->numOfCols; *numOfCols = pDataBlockSchema->numOfCols;
*pResSchema = calloc(pDataBlockSchema->numOfCols, sizeof(SSchema)); if (*numOfCols > 0) {
memcpy((*pResSchema), pDataBlockSchema->pSchema, pDataBlockSchema->numOfCols * sizeof(SSchema)); *pResSchema = calloc(pDataBlockSchema->numOfCols, sizeof(SSchema));
memcpy((*pResSchema), pDataBlockSchema->pSchema, pDataBlockSchema->numOfCols * sizeof(SSchema));
}
} }
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
......
...@@ -1039,7 +1039,7 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t ...@@ -1039,7 +1039,7 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
} }
qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType)); qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
...@@ -1297,11 +1297,11 @@ void schDropJobAllTasks(SSchJob *pJob) { ...@@ -1297,11 +1297,11 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks); schDropTaskInHashList(pJob, pJob->failTasks);
} }
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
qDebug("QID:%"PRIx64" job started", pDag->queryId); qDebug("QID:0x%"PRIx64" job started", pDag->queryId);
if (nodeList && taosArrayGetSize(nodeList) <= 0) { if (pNodeList && taosArrayGetSize(pNodeList) <= 0) {
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); qDebug("QID:0x%"PRIx64" input exec nodeList is empty", pDag->queryId);
} }
int32_t code = 0; int32_t code = 0;
...@@ -1313,7 +1313,10 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struc ...@@ -1313,7 +1313,10 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struc
pJob->attr.syncSchedule = syncSchedule; pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport; pJob->transport = transport;
pJob->nodeList = nodeList;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
...@@ -1429,12 +1432,12 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str ...@@ -1429,12 +1432,12 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) { int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, false)); SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, false));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1719,7 +1722,7 @@ void schedulerFreeJob(void *job) { ...@@ -1719,7 +1722,7 @@ void schedulerFreeJob(void *job) {
tfree(pJob); tfree(pJob);
qDebug("QID:%"PRIx64" job freed", queryId); qDebug("QID:0x%"PRIx64" job freed", queryId);
} }
void schedulerFreeTaskList(SArray *taskList) { void schedulerFreeTaskList(SArray *taskList) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册