diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt index d41ae09a58a8942e2f033237815060f466ee49d1..e8796306f3ef00893de91b9cb491215327d38012 100644 --- a/src/cq/CMakeLists.txt +++ b/src/cq/CMakeLists.txt @@ -4,6 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index e3df73a883aa6e5bd67fe508399a813db6f88adf..62b9a414944f9e365e066dd93c9578914f897164 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -20,6 +20,7 @@ #include #include "taosdef.h" #include "taosmsg.h" +#include "tglobal.h" #include "tlog.h" #include "twal.h" #include "tcq.h" @@ -32,7 +33,6 @@ typedef struct { int vgId; - char path[TSDB_FILENAME_LEN]; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; FCqWrite cqWrite; @@ -44,12 +44,13 @@ typedef struct { } SCqContext; typedef struct SCqObj { - int sid; // table ID + int tid; // table ID int rowSize; // bytes of a row char *sqlStr; // SQL string int columns; // number of columns SSchema *pSchema; // pointer to schema array void *pStream; + struct SCqObj *prev; struct SCqObj *next; SCqContext *pContext; } SCqObj; @@ -65,30 +66,10 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { strcpy(pContext->user, pCfg->user); strcpy(pContext->pass, pCfg->pass); - strcpy(pContext->path, pCfg->path); pContext->vgId = pCfg->vgId; pContext->cqWrite = pCfg->cqWrite; pContext->ahandle = ahandle; - // open meta data file - - // loop each record - while (1) { - SCqObj *pObj = calloc(sizeof(SCqObj), 1); - if (pObj == NULL) { - cError("vgId:%d, no memory", pContext->vgId); - continue; - } - - pObj->next = pContext->pHead; - pContext->pHead = pObj; - - // assigne each field in SCqObj - // pObj->sid = - // strcpy(pObj->sqlStr, ?? ); - // schema, columns - } - pthread_mutex_init(&pContext->mutex, NULL); cTrace("vgId:%d, CQ is opened", pContext->vgId); @@ -102,8 +83,6 @@ void cqClose(void *handle) { // stop all CQs cqStop(pContext); - // save the meta data - // free all resources SCqObj *pObj = pContext->pHead; while (pObj) { @@ -125,23 +104,23 @@ void cqStart(void *handle) { pthread_mutex_lock(&pContext->mutex); + tscEmbedded = 1; pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); - if (pContext->dbConn) { + if (pContext->dbConn == NULL) { cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); pthread_mutex_unlock(&pContext->mutex); return; } - SCqObj *pObj = pContext->pHead; while (pObj) { int64_t lastKey = 0; pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); if (pObj->pStream) { pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->sqlStr); + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); } pObj = pObj->next; } @@ -158,9 +137,11 @@ void cqStop(void *handle) { SCqObj *pObj = pContext->pHead; while (pObj) { - if (pObj->pStream) taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->sid, pObj->sqlStr); + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + } pObj = pObj->next; } @@ -171,13 +152,13 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns) { +void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); - if (pObj == NULL) return; + if (pObj == NULL) return NULL; - pObj->sid = sid; + pObj->tid = tid; pObj->sqlStr = malloc(strlen(sqlStr)+1); strcpy(pObj->sqlStr, sqlStr); @@ -187,11 +168,12 @@ void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns pObj->pSchema = malloc(size); memcpy(pObj->pSchema, pSchema, size); - cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); pthread_mutex_lock(&pContext->mutex); pObj->next = pContext->pHead; + if (pContext->pHead) pContext->pHead->prev = pObj; pContext->pHead = pObj; if (pContext->dbConn) { @@ -199,50 +181,39 @@ void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); if (pObj->pStream) { pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); } else { - cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->sid, pObj->sqlStr); + cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr); } } pthread_mutex_unlock(&pContext->mutex); + + return pObj; } -void cqDrop(void *handle, int sid) { - SCqContext *pContext = handle; +void cqDrop(void *handle) { + SCqObj *pObj = handle; + SCqContext *pContext = pObj->pContext; pthread_mutex_lock(&pContext->mutex); - // locate the pObj; - SCqObj *prev = NULL; - SCqObj *pObj = pContext->pHead; - while (pObj) { - if (pObj->sid != sid) { - prev = pObj; - pObj = pObj->next; - continue; - } - - // remove from the linked list - if (prev) { - prev->next = pObj->next; - } else { - pContext->pHead = pObj->next; - } + if (pObj->prev) { + pObj->prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } - break; + if (pObj->next) { + pObj->next->prev = pObj->prev; } - if (pObj) { - // update the meta data + // free the resources associated + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; - // free the resources associated - if (pObj->pStream) taos_close_stream(pObj->pStream); - pObj->pStream = NULL; - - cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->sid, pObj->sqlStr); - free(pObj); - } + cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + free(pObj); pthread_mutex_lock(&pContext->mutex); } @@ -252,7 +223,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SCqContext *pContext = pObj->pContext; if (pObj->pStream == NULL) return; - cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->sid, pObj->sqlStr); + cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); // construct data int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; @@ -269,11 +240,10 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); // to do: fill in the SSubmitBlk strucuture - pBlk->tid = pObj->sid; + pBlk->tid = pObj->tid; // write into vnode write queue pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); - } diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index b0c6ca3178a3ed24ae2d93eff03f0ea0acb0e568..f620f443823fbc0d18560774604d785272c83409 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -29,16 +29,16 @@ int writeToQueue(void *pVnode, void *data, int type) { } int main(int argc, char *argv[]) { - char path[128] = "~/cq"; + int num = 3; for (int i=1; iwqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + SCqCfg cqCfg; + sprintf(cqCfg.user, "root"); + strcpy(cqCfg.pass, tsInternalPass); + cqCfg.cqWrite = vnodeWriteToQueue; + pVnode->cq = cqOpen(pVnode, &cqCfg); + STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.walCallBack = vnodeWalCallback; + appH.cqH = pVnode->cq; sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); @@ -210,12 +217,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->wal = walOpen(temp, &pVnode->walCfg); walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); - SCqCfg cqCfg; - sprintf(cqCfg.path, "%s/cq", rootDir); - strcpy(cqCfg.pass, tsInternalPass); - cqCfg.cqWrite = vnodeWriteToQueue; - pVnode->cq = cqOpen(pVnode, &cqCfg); - SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; syncInfo.version = pVnode->version; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a176468e853271c4187cea5956fa1c6184419a0b..77ed65c161d5ac68a7f5a67685f67fe682f52815 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -114,7 +114,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfTags = htons(pTable->numOfTags); int32_t sid = htonl(pTable->sid); - int32_t sqlDataLen = htonl(pTable->sqlDataLen); uint64_t uid = htobe64(pTable->uid); SSchema *pSchema = (SSchema *) pTable->data; @@ -151,14 +150,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe } code = tsdbCreateTable(pVnode->tsdb, &tCfg); - - if (code == 0 && sqlDataLen >0) { - char *sqlStr = NULL; - // to do: get the sqlStr - - cqCreate(pVnode->cq, sid, sqlStr, pSchema, numOfColumns); - } - tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); @@ -176,7 +167,6 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet }; code = tsdbDropTable(pVnode->tsdb, tableId); - cqDrop(pVnode->cq, tableId.tid); return code; }