From 2831779eb36548808772d6a3a6ea9ca695cc4ad7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 29 May 2020 02:30:51 +0000 Subject: [PATCH] TD-354 --- src/cq/src/cqMain.c | 35 ++++++++++++++++------------------- src/cq/test/cqtest.c | 17 ++++++----------- src/inc/tcq.h | 3 ++- src/inc/tsdb.h | 2 ++ src/tsdb/inc/tsdbMain.h | 3 ++- src/tsdb/src/tsdbMain.c | 2 +- src/tsdb/src/tsdbMeta.c | 17 +++++++++++++++-- src/vnode/src/vnodeMain.c | 4 ++++ 8 files changed, 48 insertions(+), 35 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 6e81db7db7..9406a2fdce 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -15,17 +15,19 @@ #define _DEFAULT_SOURCE +#include +#include #include #include -#include -#include + +#include "taos.h" #include "taosdef.h" #include "taosmsg.h" +#include "tcq.h" +#include "tdataformat.h" #include "tglobal.h" #include "tlog.h" #include "twal.h" -#include "tcq.h" -#include "taos.h" #define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }} #define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }} @@ -46,15 +48,14 @@ typedef struct { } SCqContext; typedef struct SCqObj { - int tid; // table ID - int rowSize; // bytes of a row - char *sqlStr; // SQL string - int columns; // number of columns - SSchema *pSchema; // pointer to schema array - void *pStream; - struct SCqObj *prev; - struct SCqObj *next; - SCqContext *pContext; + int tid; // table ID + int rowSize; // bytes of a row + char * sqlStr; // SQL string + STSchema * pSchema; // pointer to schema array + void * pStream; + struct SCqObj *prev; + struct SCqObj *next; + SCqContext * pContext; } SCqObj; int cqDebugFlag = 135; @@ -152,7 +153,7 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { +void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); @@ -162,11 +163,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column pObj->sqlStr = malloc(strlen(sqlStr)+1); strcpy(pObj->sqlStr, sqlStr); - pObj->columns = columns; - - int size = sizeof(SSchema) * columns; - pObj->pSchema = malloc(size); - memcpy(pObj->pSchema, pSchema, size); + pObj->pSchema = tdDupSchema(pSchema); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index 7977bd85bc..3aa649ee34 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -59,21 +59,16 @@ int main(int argc, char *argv[]) { exit(-1); } - SSchema schema[2]; - schema[0].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(schema[0].name, "ts"); - schema[0].colId = 0; - schema[0].bytes = 8; - - schema[1].type = TSDB_DATA_TYPE_INT; - strcpy(schema[1].name, "avgspeed"); - schema[1].colId = 1; - schema[1].bytes = 4; + STSchema *pSchema = tdNewSchema(2); + tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8); + tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4); for (int sid =1; sid<10; ++sid) { - cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", schema, 2); + cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); } + tdFreeSchema(pSchema); + while (1) { char c = getchar(); diff --git a/src/inc/tcq.h b/src/inc/tcq.h index d0a2097c05..e025afaa0a 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -19,6 +19,7 @@ extern "C" { #endif +#include "tdataformat.h" typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); @@ -40,7 +41,7 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); +void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 341dee1476..63539e856d 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -43,6 +43,8 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status); int (*eventCallBack)(void *); + void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema); + void (*cqDropFunc)(void *handle); } STsdbAppH; // --------- TSDB REPOSITORY CONFIGURATION DEFINITION diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 0839e0f8ff..0b7c878f23 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -110,6 +110,7 @@ typedef struct { SMetaFile *mfh; // meta file handle int maxRowBytes; int maxCols; + void * pRepo; } STsdbMeta; // element put in skiplist for each table @@ -118,7 +119,7 @@ typedef struct STableIndexElem { STable* pTable; } STableIndexElem; -STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables); +STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo); int32_t tsdbFreeMeta(STsdbMeta *pMeta); STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index bddb3fcaff..9fbd822a65 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -205,7 +205,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { tsdbRestoreCfg(pRepo, &(pRepo->config)); if (pAppH) pRepo->appH = *pAppH; - pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); + pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables, pRepo); if (pRepo->tsdbMeta == NULL) { free(pRepo->rootDir); free(pRepo); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index e320de9827..7d71a0ff3d 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -142,6 +142,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { void tsdbOrgMeta(void *pHandle) { STsdbMeta *pMeta = (STsdbMeta *)pHandle; + STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; @@ -149,13 +150,20 @@ void tsdbOrgMeta(void *pHandle) { tsdbAddTableIntoIndex(pMeta, pTable); } } + + for (int i = 0; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); + } + } } /** * Initialize the meta handle * ASSUMPTIONS: VALID PARAMETER */ -STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { +STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo) { STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); if (pMeta == NULL) return NULL; @@ -165,6 +173,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); pMeta->maxRowBytes = 0; pMeta->maxCols = 0; + pMeta->pRepo = pRepo; if (pMeta->tables == NULL) { free(pMeta); return NULL; @@ -189,10 +198,13 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { } int32_t tsdbFreeMeta(STsdbMeta *pMeta) { + STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; if (pMeta == NULL) return 0; tsdbCloseMetaFile(pMeta->mfh); + (*pRepo->appH.cqDropFunc)(pRepo->appH.cqH); + for (int i = 1; i < pMeta->maxTables; i++) { if (pMeta->tables[i] != NULL) { tsdbFreeTable(pMeta->tables[i]); @@ -512,6 +524,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { } static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { + STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; if (pTable->type == TSDB_SUPER_TABLE) { // add super table to the linked list if (pMeta->superList == NULL) { @@ -531,7 +544,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { tsdbAddTableIntoIndex(pMeta, pTable); } if (pTable->type == TSDB_STREAM_TABLE && addIdx) { - // TODO + (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); } pMeta->nTables++; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b25180f0f0..d96e19a3f1 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -220,6 +220,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; + appH.cqCreateFunc = cqCreate; + appH.cqDropFunc = cqDrop; sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { @@ -467,6 +469,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; + appH.cqCreateFunc = cqCreate; + appH.cqDropFunc = cqDrop; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); } -- GitLab