diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 904050fb1bc255db98bf5bb59f9f59adc3cedac8..57d723437981205458d32357c8a05ce9ba46e451 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,5 +14,6 @@ ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) ADD_SUBDIRECTORY(tsdb) ADD_SUBDIRECTORY(wal) +ADD_SUBDIRECTORY(cq) ADD_SUBDIRECTORY(dnode) ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..d41ae09a58a8942e2f033237815060f466ee49d1 --- /dev/null +++ b/src/cq/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(inc) + +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) + +ADD_LIBRARY(tcq ${SRC}) +TARGET_LINK_LIBRARIES(tcq tutil common taos) + +ADD_SUBDIRECTORY(test) + diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index e5bbf8353a806c22e95a45ff0c730496444c7434..e3df73a883aa6e5bd67fe508399a813db6f88adf 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -14,154 +14,266 @@ */ #define _DEFAULT_SOURCE + +#include +#include +#include +#include "taosdef.h" #include "taosmsg.h" -#include "vnode.h" +#include "tlog.h" +#include "twal.h" +#include "tcq.h" +#include "taos.h" + +#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);} +#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);} +#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);} +#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);} + +typedef struct { + int vgId; + char path[TSDB_FILENAME_LEN]; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; + void *ahandle; + int num; // number of continuous streams + struct SCqObj *pHead; + void *dbConn; + pthread_mutex_t mutex; +} SCqContext; + +typedef struct SCqObj { + int sid; // table ID + int rowSize; // bytes of a row + char *sqlStr; // SQL string + int columns; // number of columns + SSchema *pSchema; // pointer to schema array + void *pStream; + struct SCqObj *next; + SCqContext *pContext; +} SCqObj; + +int cqDebugFlag = 135; + +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); + +void *cqOpen(void *ahandle, const SCqCfg *pCfg) { + + SCqContext *pContext = calloc(sizeof(SCqContext), 1); + if (pContext == NULL) return NULL; + + strcpy(pContext->user, pCfg->user); + strcpy(pContext->pass, pCfg->pass); + strcpy(pContext->path, pCfg->path); + pContext->vgId = pCfg->vgId; + pContext->cqWrite = pCfg->cqWrite; + pContext->ahandle = ahandle; + + // open meta data file + + // loop each record + while (1) { + SCqObj *pObj = calloc(sizeof(SCqObj), 1); + if (pObj == NULL) { + cError("vgId:%d, no memory", pContext->vgId); + continue; + } -/* static TAOS *dbConn = NULL; */ -void vnodeCloseStreamCallback(void *param); + pObj->next = pContext->pHead; + pContext->pHead = pObj; -void cqOpen(void *param, void *tmrId) { - SVnodeObj *pVnode = (SVnodeObj *)param; - SMeterObj *pObj; + // assigne each field in SCqObj + // pObj->sid = + // strcpy(pObj->sqlStr, ?? ); + // schema, columns + } - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pVnode->meterList == NULL) return; + pthread_mutex_init(&pContext->mutex, NULL); - taosTmrStopA(&pVnode->streamTimer); - pVnode->streamTimer = NULL; + cTrace("vgId:%d, CQ is opened", pContext->vgId); - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; + return pContext; +} - dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); +void cqClose(void *handle) { + SCqContext *pContext = handle; - if (pVnode->dbConn == NULL) { - char db[64] = {0}; - char user[64] = {0}; - vnodeGetDBFromMeterId(pObj, db); - sprintf(user, "_%s", pVnode->cfg.acct); - pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0); - } + // stop all CQs + cqStop(pContext); - if (pVnode->dbConn == NULL) { - dError("vid:%d, failed to connect to mgmt node", pVnode->vnode); - taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer); - return; - } + // save the meta data - if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; - } - } + // free all resources + SCqObj *pObj = pContext->pHead; + while (pObj) { + SCqObj *pTemp = pObj; + pObj = pObj->next; + free(pTemp); + } + + pthread_mutex_destroy(&pContext->mutex); + + cTrace("vgId:%d, CQ is closed", pContext->vgId); + free(pContext); } -// Close all streams in a vnode -void cqClose(SVnodeObj *pVnode) { - SMeterObj *pObj; - dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); - - // stop stream computing - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL) continue; - if (pObj->sqlLen > 0 && pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; +void cqStart(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, start all CQs", pContext->vgId); + if (pContext->dbConn) return; + + pthread_mutex_lock(&pContext->mutex); + + pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); + if (pContext->dbConn) { + cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); + pthread_mutex_unlock(&pContext->mutex); + return; + } + + + SCqObj *pObj = pContext->pHead; + while (pObj) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->sqlStr); } - pObj->pStream = NULL; + pObj = pObj->next; } + + pthread_mutex_unlock(&pContext->mutex); } -void cqCreate(SMeterObj *pObj) { - if (pObj->sqlLen <= 0) return; +void cqStop(void *handle) { + SCqContext *pContext = handle; + cTrace("vgId:%d, stop all CQs", pContext->vgId); + if (pContext->dbConn == NULL) return; - SVnodeObj *pVnode = vnodeList + pObj->vnode; + pthread_mutex_lock(&pContext->mutex); - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pObj->pStream) return; + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->sid, pObj->sqlStr); - dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); - if (pVnode->dbConn == NULL) { - if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer); - } else { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; + pObj = pObj->next; } + + if (pContext->dbConn) taos_close(pContext->dbConn); + pContext->dbConn = NULL; + + pthread_mutex_unlock(&pContext->mutex); } -// Close only one stream -void cqDrop(SMeterObj *pObj) { - SVnodeObj *pVnode = vnodeList + pObj->vnode; - if (pObj->sqlLen <= 0) return; +void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns) { + SCqContext *pContext = handle; - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; - } + SCqObj *pObj = calloc(sizeof(SCqObj), 1); + if (pObj == NULL) return; - pObj->pStream = NULL; - if (pVnode->numOfStreams == 0) { - taos_close(pVnode->dbConn); - pVnode->dbConn = NULL; - } + pObj->sid = sid; + pObj->sqlStr = malloc(strlen(sqlStr)+1); + strcpy(pObj->sqlStr, sqlStr); - dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId); -} + pObj->columns = columns; -void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SMeterObj *pObj = (SMeterObj *)param; - dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId); + int size = sizeof(SSchema) * columns; + pObj->pSchema = malloc(size); + memcpy(pObj->pSchema, pSchema, size); - // construct data - int32_t contLen = pObj->bytesPerPoint; - char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader)); - SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader)); + cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->sid, pObj->sqlStr); - pMsg->numOfRows = htons(1); + pthread_mutex_lock(&pContext->mutex); - char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; + pObj->next = pContext->pHead; + pContext->pHead = pObj; - int32_t offset = 0; - for (int32_t i = 0; i < pObj->numOfColumns; ++i) { - char *dst = row[i]; - if (dst == NULL) { - setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes); + if (pContext->dbConn) { + int64_t lastKey = 0; + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->sid, pObj->sqlStr); } else { - // here, we need to transfer nchar(utf8) to unicode(ucs-4) - if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) { - taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW); - dst = ncharBuf; - } + cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->sid, pObj->sqlStr); + } + } - memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes); + pthread_mutex_unlock(&pContext->mutex); +} + +void cqDrop(void *handle, int sid) { + SCqContext *pContext = handle; + + pthread_mutex_lock(&pContext->mutex); + + // locate the pObj; + SCqObj *prev = NULL; + SCqObj *pObj = pContext->pHead; + while (pObj) { + if (pObj->sid != sid) { + prev = pObj; + pObj = pObj->next; + continue; } - offset += pObj->schema[i].bytes; + // remove from the linked list + if (prev) { + prev->next = pObj->next; + } else { + pContext->pHead = pObj->next; + } + + break; } - contLen += sizeof(SSubmitMsg); + if (pObj) { + // update the meta data - int32_t numOfPoints = 0; - int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, - &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); + // free the resources associated + if (pObj->pStream) taos_close_stream(pObj->pStream); + pObj->pStream = NULL; - if (code != TSDB_CODE_SUCCESS) { - dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); + cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->sid, pObj->sqlStr); + free(pObj); } - assert(numOfPoints >= 0 && numOfPoints <= 1); - tfree(pTemp); + pthread_mutex_lock(&pContext->mutex); } -static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) { - char *st = strstr(pObj->meterId, "."); - char *end = strstr(st + 1, "."); +static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { + SCqObj *pObj = (SCqObj *)param; + SCqContext *pContext = pObj->pContext; + if (pObj->pStream == NULL) return; - memcpy(db, st + 1, end - (st + 1)); -} + cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->sid, pObj->sqlStr); + + // construct data + int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; + char *buffer = calloc(size, 1); + + SWalHead *pHead = (SWalHead *)buffer; + pHead->msgType = TSDB_MSG_TYPE_SUBMIT; + pHead->len = size - sizeof(SWalHead); + + SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead)); + // to do: fill in the SSubmitMsg structure + pSubmit->numOfBlocks = 1; + SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); + // to do: fill in the SSubmitBlk strucuture + pBlk->tid = pObj->sid; + + + // write into vnode write queue + pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); + +} + diff --git a/src/cq/src/vnodeStream.c b/src/cq/src/vnodeStream.c deleted file mode 100644 index 1a8611fdabe651ee81919634e50c551fcd99fb49..0000000000000000000000000000000000000000 --- a/src/cq/src/vnodeStream.c +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "taosmsg.h" -#include "vnode.h" -#include "vnodeUtil.h" -#include "vnodeStatus.h" - -/* static TAOS *dbConn = NULL; */ -void vnodeCloseStreamCallback(void *param); - -void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { - SMeterObj *pObj = (SMeterObj *)param; - dTrace("vid:%d sid:%d id:%s, stream result is ready", pObj->vnode, pObj->sid, pObj->meterId); - - // construct data - int32_t contLen = pObj->bytesPerPoint; - char * pTemp = calloc(1, sizeof(SSubmitMsg) + pObj->bytesPerPoint + sizeof(SVMsgHeader)); - SSubmitMsg *pMsg = (SSubmitMsg *)(pTemp + sizeof(SVMsgHeader)); - - pMsg->numOfRows = htons(1); - - char ncharBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; - - int32_t offset = 0; - for (int32_t i = 0; i < pObj->numOfColumns; ++i) { - char *dst = row[i]; - if (dst == NULL) { - setNull(pMsg->payLoad + offset, pObj->schema[i].type, pObj->schema[i].bytes); - } else { - // here, we need to transfer nchar(utf8) to unicode(ucs-4) - if (pObj->schema[i].type == TSDB_DATA_TYPE_NCHAR) { - taosMbsToUcs4(row[i], pObj->schema[i].bytes, ncharBuf, TSDB_MAX_BYTES_PER_ROW); - dst = ncharBuf; - } - - memcpy(pMsg->payLoad + offset, dst, pObj->schema[i].bytes); - } - - offset += pObj->schema[i].bytes; - } - - contLen += sizeof(SSubmitMsg); - - int32_t numOfPoints = 0; - int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, - &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); - - if (code != TSDB_CODE_SUCCESS) { - dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); - } - - assert(numOfPoints >= 0 && numOfPoints <= 1); - tfree(pTemp); -} - -static void vnodeGetDBFromMeterId(SMeterObj *pObj, char *db) { - char *st = strstr(pObj->meterId, "."); - char *end = strstr(st + 1, "."); - - memcpy(db, st + 1, end - (st + 1)); -} - -void vnodeOpenStreams(void *param, void *tmrId) { - SVnodeObj *pVnode = (SVnodeObj *)param; - SMeterObj *pObj; - - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pVnode->meterList == NULL) return; - - taosTmrStopA(&pVnode->streamTimer); - pVnode->streamTimer = NULL; - - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; - - dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); - - if (pVnode->dbConn == NULL) { - char db[64] = {0}; - char user[64] = {0}; - vnodeGetDBFromMeterId(pObj, db); - sprintf(user, "_%s", pVnode->cfg.acct); - pVnode->dbConn = taos_connect(NULL, user, tsInternalPass, db, 0); - } - - if (pVnode->dbConn == NULL) { - dError("vid:%d, failed to connect to mgmt node", pVnode->vnode); - taosTmrReset(vnodeOpenStreams, 1000, param, vnodeTmrCtrl, &pVnode->streamTimer); - return; - } - - if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; - } - } -} - -void vnodeCreateStream(SMeterObj *pObj) { - if (pObj->sqlLen <= 0) return; - - SVnodeObj *pVnode = vnodeList + pObj->vnode; - - if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return; - if (pObj->pStream) return; - - dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); - if (pVnode->dbConn == NULL) { - if (pVnode->streamTimer == NULL) taosTmrReset(vnodeOpenStreams, 1000, pVnode, vnodeTmrCtrl, &pVnode->streamTimer); - } else { - pObj->pStream = taos_open_stream(pVnode->dbConn, pObj->pSql, vnodeProcessStreamRes, pObj->lastKey, pObj, - vnodeCloseStreamCallback); - if (pObj->pStream) pVnode->numOfStreams++; - } -} - -// Close only one stream -void vnodeRemoveStream(SMeterObj *pObj) { - SVnodeObj *pVnode = vnodeList + pObj->vnode; - if (pObj->sqlLen <= 0) return; - - if (pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; - } - - pObj->pStream = NULL; - if (pVnode->numOfStreams == 0) { - taos_close(pVnode->dbConn); - pVnode->dbConn = NULL; - } - - dTrace("vid:%d sid:%d id:%d stream is removed", pObj->vnode, pObj->sid, pObj->meterId); -} - -// Close all streams in a vnode -void vnodeCloseStream(SVnodeObj *pVnode) { - SMeterObj *pObj; - dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); - - // stop stream computing - for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { - pObj = pVnode->meterList[sid]; - if (pObj == NULL) continue; - if (pObj->sqlLen > 0 && pObj->pStream) { - taos_close_stream(pObj->pStream); - pVnode->numOfStreams--; - } - pObj->pStream = NULL; - } -} - -void vnodeUpdateStreamRole(SVnodeObj *pVnode) { - /* SMeterObj *pObj; */ - - int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? TSDB_VN_STREAM_STATUS_START : TSDB_VN_STREAM_STATUS_STOP; - if (newRole != pVnode->streamRole) { - dPrint("vid:%d, stream role is changed from %s to %s", - pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole), taosGetVnodeStreamStatusStr(newRole)); - pVnode->streamRole = newRole; - if (newRole == TSDB_VN_STREAM_STATUS_START) { - vnodeOpenStreams(pVnode, NULL); - } else { - vnodeCloseStream(pVnode); - } - } else { - dPrint("vid:%d, stream role is keep to %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole)); - } -} - -// Callback function called from client -void vnodeCloseStreamCallback(void *param) { - SMeterObj *pMeter = (SMeterObj *)param; - SVnodeObj *pVnode = NULL; - - if (pMeter == NULL || pMeter->sqlLen == 0) return; - pVnode = vnodeList + pMeter->vnode; - - pMeter->sqlLen = 0; - pMeter->pSql = NULL; - pMeter->pStream = NULL; - - pVnode->numOfStreams--; - - if (pVnode->numOfStreams == 0) { - taos_close(pVnode->dbConn); - pVnode->dbConn = NULL; - } - - vnodeSaveMeterObjToFile(pMeter); -} - - diff --git a/src/cq/test/CMakeLists.txt b/src/cq/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..99c729dff46f695e945b747a499ed809c4da2b31 --- /dev/null +++ b/src/cq/test/CMakeLists.txt @@ -0,0 +1,17 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND CQTEST_SRC ./cqtest.c) + ADD_EXECUTABLE(cqtest ${CQTEST_SRC}) + TARGET_LINK_LIBRARIES(cqtest tcq) + +ENDIF () + + diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c new file mode 100644 index 0000000000000000000000000000000000000000..b0c6ca3178a3ed24ae2d93eff03f0ea0acb0e568 --- /dev/null +++ b/src/cq/test/cqtest.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "taosdef.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tlog.h" +#include "tcq.h" + +int64_t ver = 0; +void *pCq = NULL; + +int writeToQueue(void *pVnode, void *data, int type) { + return 0; +} + +int main(int argc, char *argv[]) { + char path[128] = "~/cq"; + + for (int i=1; i + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_CQ_H_ +#define _TD_CQ_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); + +typedef struct { + int vgId; + char path[TSDB_FILENAME_LEN]; + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + FCqWrite cqWrite; +} SCqCfg; + +void *cqOpen(void *ahandle, const SCqCfg *pCfg); +void cqClose(void *handle); +void cqStart(void *handle); +void cqStop(void *handle); +void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); +void cqDrop(void *handle, int sid); + +extern int cqDebugFlag; + + +#ifdef __cplusplus +} +#endif + +#endif // _TD_CQ_H_ diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index c45eb10518765e65142eabd3294c8cc851331f3a..f4086dcd126e5961383feb5368517f670b561489 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -20,10 +20,6 @@ extern "C" { #endif -#define TAOS_QTYPE_RPC 0 -#define TAOS_QTYPE_FWD 1 -#define TAOS_QTYPE_WAL 2 - typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 6ceb83cb45f2e06cf46fb999a57025dc5453dc03..a1c56b32b581144bd49c1b980aa1dbf59b690ce0 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(vnode ${SRC}) - TARGET_LINK_LIBRARIES(vnode tsdb) -ENDIF () \ No newline at end of file + TARGET_LINK_LIBRARIES(vnode tsdb tcq) +ENDIF () diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1302ceaff4bd814e163b6048a50b98d90d7f8754..3ee0083cb37dc1bf3fe6b192a4523b350c913ed5 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,6 +30,8 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" +//#include "tsync.h" static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; @@ -192,8 +194,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + STsdbAppH appH = {0}; + appH.appH = (void *)pVnode; + appH.walCallBack = vnodeWalCallback; + + sprintf(temp, "%s/tsdb", rootDir); + pVnode->tsdb = tsdbOpenRepo(temp, &appH); + if (pVnode->tsdb == NULL) { + dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return terrno; + } + sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); + + SCqCfg cqCfg; + sprintf(cqCfg.path, "%s/cq", rootDir); + strcpy(cqCfg.pass, tsInternalPass); + cqCfg.cqWrite = vnodeWriteToQueue; + pVnode->cq = cqOpen(pVnode, &cqCfg); SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; @@ -208,24 +229,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); - pVnode->events = NULL; - pVnode->cq = NULL; + // start continuous query + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); - STsdbAppH appH = {0}; - appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWalCallback; - - sprintf(temp, "%s/tsdb", rootDir); - void *pTsdb = tsdbOpenRepo(temp, &appH); - if (pTsdb == NULL) { - dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; - } - - pVnode->tsdb = pTsdb; - - walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); + pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); @@ -350,10 +358,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } + cqClose(pVnode->cq); + pVnode->cq = NULL; + tsdbCloseRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + walClose(pVnode->wal); - vnodeSaveVersion(pVnode); + pVnode->wal = NULL; + vnodeSaveVersion(pVnode); vnodeRelease(pVnode); } @@ -377,6 +391,11 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; + + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + cqStart(pVnode->cq); + else + cqStop(pVnode->cq); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b1a49e6e6535d52eb70d92c348ef1d87642243b1..a176468e853271c4187cea5956fa1c6184419a0b 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -26,6 +26,7 @@ #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" +#include "tcq.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); @@ -113,6 +114,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfTags = htons(pTable->numOfTags); int32_t sid = htonl(pTable->sid); + int32_t sqlDataLen = htonl(pTable->sqlDataLen); uint64_t uid = htobe64(pTable->uid); SSchema *pSchema = (SSchema *) pTable->data; @@ -150,6 +152,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe code = tsdbCreateTable(pVnode->tsdb, &tCfg); + if (code == 0 && sqlDataLen >0) { + char *sqlStr = NULL; + // to do: get the sqlStr + + cqCreate(pVnode->cq, sid, sqlStr, pSchema, numOfColumns); + } + tfree(pDestSchema); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); @@ -167,6 +176,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet }; code = tsdbDropTable(pVnode->tsdb, tableId); + cqDrop(pVnode->cq, tableId.tid); return code; }