提交 b1e781b1 编写于 作者: D dapan1121

fix bug

上级 9a64b8a4
...@@ -54,6 +54,7 @@ typedef struct { ...@@ -54,6 +54,7 @@ typedef struct {
typedef struct SCqObj { typedef struct SCqObj {
tmr_h tmrId; tmr_h tmrId;
int64_t rid;
uint64_t uid; uint64_t uid;
int32_t tid; // table ID int32_t tid; // table ID
int32_t rowSize; // bytes of a row int32_t rowSize; // bytes of a row
...@@ -69,6 +70,77 @@ typedef struct SCqObj { ...@@ -69,6 +70,77 @@ typedef struct SCqObj {
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
int32_t cqObjRef = -1;
void cqFree(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext;
int32_t last = 0;
pthread_mutex_lock(&pContext->mutex);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pContext->pHead = pObj->next;
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
}
if (pContext->pHead == NULL) {
last = 1;
}
// free the resources associated
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr);
free(pObj);
pthread_mutex_unlock(&pContext->mutex);
if (last) {
pthread_mutex_unlock(&pContext->mutex);
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
}
void cqCreateRef() {
int32_t ref = atomic_load_32(&cqObjRef);
if (ref == -1) {
ref = taosOpenRef(4096, cqFree);
if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) {
taosCloseRef(ref);
}
}
}
void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
if (tsEnableStream == 0) { if (tsEnableStream == 0) {
return NULL; return NULL;
...@@ -79,6 +151,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -79,6 +151,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return NULL; return NULL;
} }
cqCreateRef();
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
...@@ -97,6 +171,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -97,6 +171,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
pthread_mutex_init(&pContext->mutex, NULL); pthread_mutex_init(&pContext->mutex, NULL);
cDebug("vgId:%d, CQ is opened", pContext->vgId); cDebug("vgId:%d, CQ is opened", pContext->vgId);
return pContext; return pContext;
...@@ -119,20 +194,9 @@ void cqClose(void *handle) { ...@@ -119,20 +194,9 @@ void cqClose(void *handle) {
while (pObj) { while (pObj) {
SCqObj *pTemp = pObj; SCqObj *pTemp = pObj;
pObj = pObj->next; pObj = pObj->next;
tdFreeSchema(pTemp->pSchema);
tfree(pTemp->sqlStr);
free(pTemp);
}
pthread_mutex_unlock(&pContext->mutex);
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl); taosReleaseRef(cqObjRef, pTemp->rid);
pContext->tmrCtrl = NULL; }
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
} }
void cqStart(void *handle) { void cqStart(void *handle) {
...@@ -213,10 +277,13 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch ...@@ -213,10 +277,13 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
if (pContext->pHead) pContext->pHead->prev = pObj; if (pContext->pHead) pContext->pHead->prev = pObj;
pContext->pHead = pObj; pContext->pHead = pObj;
pObj->rid = taosAddRef(cqObjRef, pObj);
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
return pObj; return pObj;
} }
...@@ -229,16 +296,6 @@ void cqDrop(void *handle) { ...@@ -229,16 +296,6 @@ void cqDrop(void *handle) {
pthread_mutex_lock(&pContext->mutex); pthread_mutex_lock(&pContext->mutex);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pContext->pHead = pObj->next;
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
}
// free the resources associated // free the resources associated
if (pObj->pStream) { if (pObj->pStream) {
taos_close_stream(pObj->pStream); taos_close_stream(pObj->pStream);
...@@ -248,17 +305,17 @@ void cqDrop(void *handle) { ...@@ -248,17 +305,17 @@ void cqDrop(void *handle) {
pObj->tmrId = 0; pObj->tmrId = 0;
} }
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); taosReleaseRef(cqObjRef, pObj->rid);
tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr);
free(pObj);
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
SCqObj* pObj = (SCqObj*)param; SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext; SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result; SSqlObj* pSql = (SSqlObj*)result;
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
...@@ -267,10 +324,16 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { ...@@ -267,10 +324,16 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
pthread_mutex_lock(&pContext->mutex); pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
taosReleaseRef(cqObjRef, (int64_t)param);
} }
static void cqProcessCreateTimer(void *param, void *tmrId) { static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param; SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext; SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
...@@ -281,6 +344,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { ...@@ -281,6 +344,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
cqCreateStream(pContext, pObj); cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
taosReleaseRef(cqObjRef, (int64_t)param);
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
...@@ -288,7 +353,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -288,7 +353,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId); cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
return; return;
} }
pObj->tmrId = 0; pObj->tmrId = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册