提交 1f73c106 编写于 作者: B Bomin Zhang

TD-1529: stream async create db connection

and also fix failed test cases related to stream.
上级 f8cbfe74
...@@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us ...@@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us
return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port); return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port);
} }
static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
SSqlObj *pSql = (SSqlObj *) tres;
assert(pSql != NULL);
pSql->fetchFp(pSql->param, tres, code);
}
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) { void *param, void **taos) {
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos); SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos);
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql); pSql->res.code = tscProcessSql(pSql);
tscDebug("%p DB async connection is opening", taos); tscDebug("%p DB async connection is opening", taos);
return taos; return taos;
......
...@@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return; return;
} }
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
...@@ -608,6 +612,7 @@ void taos_close_stream(TAOS_STREAM *handle) { ...@@ -608,6 +612,7 @@ void taos_close_stream(TAOS_STREAM *handle) {
* Here, we need a check before release memory * Here, we need a check before release memory
*/ */
if (pSql->signature == pSql) { if (pSql->signature == pSql) {
T_REF_DEC(pSql->pTscObj);
tscRemoveFromStreamList(pStream, pSql); tscRemoveFromStreamList(pStream, pSql);
taosTmrStopA(&(pStream->pTimer)); taosTmrStopA(&(pStream->pTimer));
......
...@@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) ...@@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX) IF (TD_LINUX)
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <string.h> #include <string.h>
#include "taos.h" #include "taos.h"
#include "tsclient.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -238,18 +239,23 @@ void cqDrop(void *handle) { ...@@ -238,18 +239,23 @@ void cqDrop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
static void doCreateStream(void *param, TAOS_RES *result, int code) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
pContext->dbConn = pSql->pTscObj;
cqCreateStream(pContext, pObj);
}
static void cqProcessCreateTimer(void *param, void *tmrId) { static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param; SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext; SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
if (pContext->dbConn == NULL) { } else {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
}
}
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
}
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册