提交 2831779e 编写于 作者: H Hongze Cheng

TD-354

上级 6a7da09d
......@@ -15,17 +15,19 @@
#define _DEFAULT_SOURCE
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tcq.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tcq.h"
#include "taos.h"
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }}
......@@ -46,15 +48,14 @@ typedef struct {
} SCqContext;
typedef struct SCqObj {
int tid; // table ID
int rowSize; // bytes of a row
char *sqlStr; // SQL string
int columns; // number of columns
SSchema *pSchema; // pointer to schema array
void *pStream;
struct SCqObj *prev;
struct SCqObj *next;
SCqContext *pContext;
int tid; // table ID
int rowSize; // bytes of a row
char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array
void * pStream;
struct SCqObj *prev;
struct SCqObj *next;
SCqContext * pContext;
} SCqObj;
int cqDebugFlag = 135;
......@@ -152,7 +153,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
}
void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) {
void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) {
SCqContext *pContext = handle;
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
......@@ -162,11 +163,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
pObj->sqlStr = malloc(strlen(sqlStr)+1);
strcpy(pObj->sqlStr, sqlStr);
pObj->columns = columns;
int size = sizeof(SSchema) * columns;
pObj->pSchema = malloc(size);
memcpy(pObj->pSchema, pSchema, size);
pObj->pSchema = tdDupSchema(pSchema);
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
......
......@@ -59,21 +59,16 @@ int main(int argc, char *argv[]) {
exit(-1);
}
SSchema schema[2];
schema[0].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(schema[0].name, "ts");
schema[0].colId = 0;
schema[0].bytes = 8;
schema[1].type = TSDB_DATA_TYPE_INT;
strcpy(schema[1].name, "avgspeed");
schema[1].colId = 1;
schema[1].bytes = 4;
STSchema *pSchema = tdNewSchema(2);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4);
for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", schema, 2);
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
}
tdFreeSchema(pSchema);
while (1) {
char c = getchar();
......
......@@ -19,6 +19,7 @@
extern "C" {
#endif
#include "tdataformat.h"
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
......@@ -40,7 +41,7 @@ void cqStart(void *handle);
void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns);
void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle);
......
......@@ -43,6 +43,8 @@ typedef struct {
void *cqH;
int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
......
......@@ -110,6 +110,7 @@ typedef struct {
SMetaFile *mfh; // meta file handle
int maxRowBytes;
int maxCols;
void * pRepo;
} STsdbMeta;
// element put in skiplist for each table
......@@ -118,7 +119,7 @@ typedef struct STableIndexElem {
STable* pTable;
} STableIndexElem;
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables);
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo);
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
......
......@@ -205,7 +205,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
tsdbRestoreCfg(pRepo, &(pRepo->config));
if (pAppH) pRepo->appH = *pAppH;
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables);
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables, pRepo);
if (pRepo->tsdbMeta == NULL) {
free(pRepo->rootDir);
free(pRepo);
......
......@@ -142,6 +142,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
void tsdbOrgMeta(void *pHandle) {
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
......@@ -149,13 +150,20 @@ void tsdbOrgMeta(void *pHandle) {
tsdbAddTableIntoIndex(pMeta, pTable);
}
}
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
}
/**
* Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER
*/
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo) {
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) return NULL;
......@@ -165,6 +173,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
pMeta->maxRowBytes = 0;
pMeta->maxCols = 0;
pMeta->pRepo = pRepo;
if (pMeta->tables == NULL) {
free(pMeta);
return NULL;
......@@ -189,10 +198,13 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
}
int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
if (pMeta == NULL) return 0;
tsdbCloseMetaFile(pMeta->mfh);
(*pRepo->appH.cqDropFunc)(pRepo->appH.cqH);
for (int i = 1; i < pMeta->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbFreeTable(pMeta->tables[i]);
......@@ -512,6 +524,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
}
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
if (pTable->type == TSDB_SUPER_TABLE) {
// add super table to the linked list
if (pMeta->superList == NULL) {
......@@ -531,7 +544,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
tsdbAddTableIntoIndex(pMeta, pTable);
}
if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
// TODO
(*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
pMeta->nTables++;
......
......@@ -220,6 +220,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop;
sprintf(temp, "%s/tsdb", rootDir);
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
if (pVnode->tsdb == NULL) {
......@@ -467,6 +469,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册