提交 3adbf1a7 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/wal

...@@ -382,6 +382,7 @@ typedef struct SSqlObj { ...@@ -382,6 +382,7 @@ typedef struct SSqlObj {
typedef struct SSqlStream { typedef struct SSqlStream {
SSqlObj *pSql; SSqlObj *pSql;
const char* dstTable;
uint32_t streamId; uint32_t streamId;
char listed; char listed;
bool isProject; bool isProject;
...@@ -408,6 +409,8 @@ typedef struct SSqlStream { ...@@ -408,6 +409,8 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next; struct SSqlStream *prev, *next;
} SSqlStream; } SSqlStream;
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn); int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
void tscInitMsgsFp(); void tscInitMsgsFp();
......
...@@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql)); tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql));
if (pStream->dstTable == NULL) {
pSdesc->dstTable[0] = 0;
} else {
tstrncpy(pSdesc->dstTable, pStream->dstTable, sizeof(pSdesc->dstTable));
}
pSdesc->streamId = htonl(pStream->streamId); pSdesc->streamId = htonl(pStream->streamId);
pSdesc->num = htobe64(pStream->num); pSdesc->num = htobe64(pStream->num);
......
...@@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr); pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
} }
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
pStream->dstTable = dstTable;
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
......
...@@ -57,6 +57,7 @@ typedef struct SCqObj { ...@@ -57,6 +57,7 @@ typedef struct SCqObj {
uint64_t uid; uint64_t uid;
int32_t tid; // table ID int32_t tid; // table ID
int32_t rowSize; // bytes of a row int32_t rowSize; // bytes of a row
char * dstTable;
char * sqlStr; // SQL string char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array STSchema * pSchema; // pointer to schema array
void * pStream; void * pStream;
...@@ -185,7 +186,7 @@ void cqStop(void *handle) { ...@@ -185,7 +186,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) { void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) {
if (tsEnableStream == 0) { if (tsEnableStream == 0) {
return NULL; return NULL;
} }
...@@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema * ...@@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *
if (pObj == NULL) return NULL; if (pObj == NULL) return NULL;
pObj->uid = uid; pObj->uid = uid;
pObj->tid = tid; pObj->tid = sid;
pObj->sqlStr = malloc(strlen(sqlStr)+1); if (dstTable != NULL) {
strcpy(pObj->sqlStr, sqlStr); pObj->dstTable = strdup(dstTable);
}
pObj->sqlStr = strdup(sqlStr);
pObj->pSchema = tdDupSchema(pSchema); pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = schemaTLen(pSchema); pObj->rowSize = schemaTLen(pSchema);
...@@ -247,6 +250,7 @@ void cqDrop(void *handle) { ...@@ -247,6 +250,7 @@ void cqDrop(void *handle) {
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema); tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr); free(pObj->sqlStr);
free(pObj); free(pObj);
...@@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pObj->pStream == NULL) { if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) { if (pObj->pStream) {
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
pContext->num++; pContext->num++;
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else { } else {
......
...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { ...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
} }
tdFreeSchema(pSchema); tdFreeSchema(pSchema);
......
...@@ -787,6 +787,7 @@ typedef struct { ...@@ -787,6 +787,7 @@ typedef struct {
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN];
uint32_t streamId; uint32_t streamId;
int64_t num; // number of computing/cycles int64_t num; // number of computing/cycles
int64_t useconds; int64_t useconds;
......
...@@ -42,7 +42,7 @@ void cqStart(void *handle); ...@@ -42,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle); void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ // cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema); void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle); void cqDrop(void *handle);
......
...@@ -48,7 +48,7 @@ typedef struct { ...@@ -48,7 +48,7 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status, int eno); int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
} STsdbAppH; } STsdbAppH;
......
此差异已折叠。
...@@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "dest table");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port"); strcpy(pSchema[cols].name, "ip:port");
...@@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v ...@@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
......
...@@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { ...@@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
for (int i = 0; i < pMeta->maxTables; i++) { for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) { if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1)); tsdbGetTableSchemaImpl(pTable, false, false, -1));
} }
} }
......
...@@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo ...@@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1)); tsdbGetTableSchemaImpl(pTable, false, false, -1));
} }
......
...@@ -22,6 +22,7 @@ python3 ./test.py -f insert/insertIntoTwoTables.py ...@@ -22,6 +22,7 @@ python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f insert/before_1970.py python3 ./test.py -f insert/before_1970.py
python3 bug2265.py python3 bug2265.py
#table
python3 ./test.py -f table/alter_wal0.py python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py python3 ./test.py -f table/column_num.py
...@@ -29,6 +30,12 @@ python3 ./test.py -f table/db_table.py ...@@ -29,6 +30,12 @@ python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py #python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py python3 ./test.py -f table/max_table_length.py
python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create-a-lot.py
python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/queryWithTaosdKilled.py
# tag # tag
...@@ -138,9 +145,6 @@ python3 ./test.py -f user/pass_len.py ...@@ -138,9 +145,6 @@ python3 ./test.py -f user/pass_len.py
# stable # stable
python3 ./test.py -f stable/query_after_reset.py python3 ./test.py -f stable/query_after_reset.py
# table
python3 ./test.py -f table/del_stable.py
#query #query
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py python3 ./test.py -f query/filterCombo.py
......
...@@ -20,6 +20,7 @@ python3 insert/retentionpolicy.py ...@@ -20,6 +20,7 @@ python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py python3 ./test.py -f insert/insertIntoTwoTables.py
#table
python3 ./test.py -f table/alter_wal0.py python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py python3 ./test.py -f table/column_num.py
...@@ -27,6 +28,12 @@ python3 ./test.py -f table/db_table.py ...@@ -27,6 +28,12 @@ python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py #python3 ./test.py -f table/tablename-boundary.py
python3 ./test.py -f table/max_table_length.py python3 ./test.py -f table/max_table_length.py
python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create-a-lot.py
python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/queryWithTaosdKilled.py
# tag # tag
python3 ./test.py -f tag_lite/filter.py python3 ./test.py -f tag_lite/filter.py
...@@ -135,9 +142,6 @@ python3 ./test.py -f user/pass_len.py ...@@ -135,9 +142,6 @@ python3 ./test.py -f user/pass_len.py
# stable # stable
python3 ./test.py -f stable/query_after_reset.py python3 ./test.py -f stable/query_after_reset.py
# table
python3 ./test.py -f table/del_stable.py
#query #query
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py python3 ./test.py -f query/filterCombo.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute("create table db.cars(ts timestamp, c int) tags(id int);")
tdSql.execute("create database db2")
tdSql.error("create table db2.car1 using db.cars tags(1)")
tdSql.error("insert into db2.car1 using db1.cars tags(1) values(now, 1);")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册