提交 69142ea5 编写于 作者: B Bomin Zhang

TD-449: data is written to TSDB

上级 d34cabf2
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tscLog.h" #include "tscLog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h" #include "tsched.h"
#include "tcache.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -147,7 +148,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -147,7 +148,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay); retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearTableMetaInfo(pTableMetaInfo, true); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), true);
tfree(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return; return;
...@@ -259,7 +261,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -259,7 +261,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->numOfRes); pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated // release the metric/meter meta information reference, so data in cache can be updated
tscClearTableMetaInfo(pTableMetaInfo, false);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
tfree(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
} }
......
...@@ -70,6 +70,7 @@ typedef struct { ...@@ -70,6 +70,7 @@ typedef struct {
int numOfCols; // Number of columns appended int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part int tlen; // maximum length of a SDataRow without the header part
int flen; // First part length in a SDataRow after the header part int flen; // First part length in a SDataRow after the header part
int32_t version;
STColumn columns[]; STColumn columns[];
} STSchema; } STSchema;
......
...@@ -49,7 +49,8 @@ typedef struct { ...@@ -49,7 +49,8 @@ typedef struct {
} SCqContext; } SCqContext;
typedef struct SCqObj { typedef struct SCqObj {
int tid; // table ID uint64_t uid;
int32_t tid; // table ID
int rowSize; // bytes of a row int rowSize; // bytes of a row
char * sqlStr; // SQL string char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array STSchema * pSchema; // pointer to schema array
...@@ -155,17 +156,19 @@ void cqStop(void *handle) { ...@@ -155,17 +156,19 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) { void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) {
SCqContext *pContext = handle; SCqContext *pContext = handle;
SCqObj *pObj = calloc(sizeof(SCqObj), 1); SCqObj *pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL; if (pObj == NULL) return NULL;
pObj->uid = uid;
pObj->tid = tid; pObj->tid = tid;
pObj->sqlStr = malloc(strlen(sqlStr)+1); pObj->sqlStr = malloc(strlen(sqlStr)+1);
strcpy(pObj->sqlStr, sqlStr); strcpy(pObj->sqlStr, sqlStr);
pObj->pSchema = tdDupSchema(pSchema); pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = pSchema->tlen;
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
...@@ -213,8 +216,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -213,8 +216,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
return;
} }
return;
} }
int64_t lastKey = 0; int64_t lastKey = 0;
...@@ -231,6 +234,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -231,6 +234,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param; SCqObj *pObj = (SCqObj *)param;
SCqContext *pContext = pObj->pContext; SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return; if (pObj->pStream == NULL) return;
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
...@@ -240,18 +244,38 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -240,18 +244,38 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
char *buffer = calloc(size, 1); char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer; SWalHead *pHead = (SWalHead *)buffer;
pHead->msgType = TSDB_MSG_TYPE_SUBMIT; SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
pHead->len = size - sizeof(SWalHead); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
// to do: fill in the SSubmitMsg structure
pSubmit->numOfBlocks = 1;
int32_t flen = 0;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
flen += TYPE_BYTES[pSchema->columns[i].type];
}
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); SDataRow trow = (SDataRow)pBlk->data;
// to do: fill in the SSubmitBlk strucuture dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
pBlk->tid = pObj->tid;
int toffset = 0;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
tdAppendColVal(trow, row[i], pSchema->columns[i].type, pSchema->columns[i].bytes, toffset);
toffset += TYPE_BYTES[pSchema->columns[i].type];
}
pBlk->len = htonl(dataRowLen(trow));
pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid);
pBlk->numOfRows = htons(1);
pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0;
pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(size - sizeof(SWalHead));
pMsg->length = pMsg->header.contLen;
pMsg->numOfBlocks = htonl(1);
pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
pHead->len = size - sizeof(SWalHead);
pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
......
...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { ...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
} }
tdFreeSchema(pSchema); tdFreeSchema(pSchema);
......
...@@ -42,7 +42,7 @@ void cqStart(void *handle); ...@@ -42,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle); void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ // cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema); void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle); void cqDrop(void *handle);
......
...@@ -43,7 +43,7 @@ typedef struct { ...@@ -43,7 +43,7 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status); int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
void *(*configFunc)(int32_t vgId, int32_t sid); void *(*configFunc)(int32_t vgId, int32_t sid);
} STsdbAppH; } STsdbAppH;
......
...@@ -481,7 +481,7 @@ void tsdbStartStream(TsdbRepoT *repo) { ...@@ -481,7 +481,7 @@ void tsdbStartStream(TsdbRepoT *repo) {
for (int i = 0; i < pRepo->config.maxTables; i++) { for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) { if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
} }
} }
} }
......
...@@ -675,7 +675,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -675,7 +675,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
if (pTable->type == TSDB_STREAM_TABLE && addIdx) { if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
} }
pMeta->nTables++; pMeta->nTables++;
......
...@@ -210,7 +210,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -210,7 +210,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
SCqCfg cqCfg = {0}; SCqCfg cqCfg = {0};
sprintf(cqCfg.user, "_root"); sprintf(cqCfg.user, "_root");
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, "s1_db0"); // TODO: replace hard coded db name strcpy(cqCfg.db, "db"); // TODO: replace hard coded db name
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue; cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册