未验证 提交 5e786294 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2160 from taosdata/patch/stream

Patch/stream
...@@ -404,6 +404,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, ...@@ -404,6 +404,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, void **taos); void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
......
...@@ -40,39 +40,38 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -40,39 +40,38 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { int doAsyncParseSql(SSqlObj* pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to malloc payload");
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return code;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
return tsParseSql(pSql, true);
}
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param; pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp; pSql->fp = fp;
sem_init(&pSql->rspSem, 0, 0);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("failed to malloc payload");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return;
}
// todo check for OOM problem
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
free(pCmd->payload);
return; return;
} }
pRes->qhandle = 0;
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, sqlstr);
tscDump("%p SQL: %s", pSql, pSql->sqlstr);
int32_t code = tsParseSql(pSql, true); int32_t code = doAsyncParseSql(pSql);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -518,15 +517,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -518,15 +517,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pSql->pStream) { if (pSql->pStream) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/* if (!pSql->cmd.parseFinished) {
* NOTE: tsParseSql(pSql, false);
* transfer the sql function for super table query before get meter/metric meta, sem_post(&pSql->rspSem);
* since in callback functions, only tscProcessSql(pStream->pSql) is executed! }
*/ return;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else { } else {
tscTrace("%p get tableMeta successfully", pSql); tscTrace("%p get tableMeta successfully", pSql);
} }
......
...@@ -515,8 +515,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -515,8 +515,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (ret != 0) { if (ret != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
} }
pCmd->parseFinished = 1;
return TSDB_CODE_SUCCESS; // do not build query message here return TSDB_CODE_SUCCESS; // do not build query message here
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tscLog.h" #include "tscLog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h" #include "tsched.h"
#include "tcache.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -77,30 +78,23 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { ...@@ -77,30 +78,23 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
int code = tscGetTableMeta(pSql, pTableMetaInfo); int code = tscGetTableMeta(pSql, pTableMetaInfo);
pSql->res.code = code; pSql->res.code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0); code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code; pSql->res.code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
} }
tscTansformSQLFuncForSTableQuery(pQueryInfo);
// failed to get meter/metric meta, retry in 10sec. // failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime);
return;
}
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscProcessSql(pStream->pSql);
tscIncStreamExecutionCount(pStream); } else {
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscDoQuery(pStream->pSql);
tscIncStreamExecutionCount(pStream);
}
} }
static void tscProcessStreamTimer(void *handle, void *tmrId) { static void tscProcessStreamTimer(void *handle, void *tmrId) {
...@@ -147,7 +141,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -147,7 +141,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay); retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearTableMetaInfo(pTableMetaInfo, true); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), true);
tfree(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return; return;
...@@ -259,7 +254,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -259,7 +254,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->numOfRes); pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated // release the metric/meter meta information reference, so data in cache can be updated
tscClearTableMetaInfo(pTableMetaInfo, false);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
tfree(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
} }
...@@ -480,45 +477,37 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -480,45 +477,37 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
setErrorInfo(pSql, TSDB_CODE_TSC_OUT_OF_MEMORY, NULL);
return NULL; return NULL;
} }
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) { SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
setErrorInfo(pSql, ret, NULL); if (pStream == NULL) {
free(pSql); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL; return NULL;
} }
pSql->pStream = pStream;
pSql->sqlstr = strdup(sqlstr); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
setErrorInfo(pSql, TSDB_CODE_TSC_OUT_OF_MEMORY, NULL); tscError("%p failed to malloc sql string buffer", pSql);
tscFreeSqlObj(pSql);
tfree(pSql); return NULL;;
return NULL;
} }
strtolower(pSql->sqlstr, sqlstr);
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
int32_t code = doAsyncParseSql(pSql);
SSqlInfo SQLInfo = {0}; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tSQLParse(&SQLInfo, pSql->sqlstr); sem_wait(&pSql->rspSem);
tscResetSqlCmdObj(&pSql->cmd);
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
setErrorInfo(pSql, ret, NULL);
tscError("%p open stream failed, sql:%s, code:%d", pSql, sqlstr, TSDB_CODE_TSC_OUT_OF_MEMORY);
tscFreeSqlObj(pSql);
return NULL;
} }
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, pRes->code, pCmd->payload); setErrorInfo(pSql, pRes->code, pCmd->payload);
...@@ -528,15 +517,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -528,15 +517,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL; return NULL;
} }
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
setErrorInfo(pSql, TSDB_CODE_TSC_OUT_OF_MEMORY, NULL);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
...@@ -550,13 +530,13 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -550,13 +530,13 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->ctime = taosGetTimestamp(pStream->precision); pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey; pStream->etime = pQueryInfo->window.ekey;
pSql->pStream = pStream;
tscAddIntoStreamList(pStream); tscAddIntoStreamList(pStream);
tscSetSlidingWindowInfo(pSql, pStream); tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime); pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime);
int64_t starttime = tscGetLaunchTimestamp(pStream); int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer);
tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, tscTrace("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
......
...@@ -145,6 +145,7 @@ void tdFreeDataRow(SDataRow row); ...@@ -145,6 +145,7 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema); void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL); ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
......
...@@ -38,6 +38,7 @@ typedef struct { ...@@ -38,6 +38,7 @@ typedef struct {
int vgId; int vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
void *ahandle; void *ahandle;
int num; // number of continuous streams int num; // number of continuous streams
...@@ -48,7 +49,8 @@ typedef struct { ...@@ -48,7 +49,8 @@ typedef struct {
} SCqContext; } SCqContext;
typedef struct SCqObj { typedef struct SCqObj {
int tid; // table ID uint64_t uid;
int32_t tid; // table ID
int rowSize; // bytes of a row int rowSize; // bytes of a row
char * sqlStr; // SQL string char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array STSchema * pSchema; // pointer to schema array
...@@ -73,6 +75,14 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -73,6 +75,14 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
strcpy(pContext->user, pCfg->user); strcpy(pContext->user, pCfg->user);
strcpy(pContext->pass, pCfg->pass); strcpy(pContext->pass, pCfg->pass);
const char* db = pCfg->db;
for (const char* p = db; *p != 0; p++) {
if (*p == '.') {
db = p + 1;
break;
}
}
strcpy(pContext->db, db);
pContext->vgId = pCfg->vgId; pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite; pContext->cqWrite = pCfg->cqWrite;
pContext->ahandle = ahandle; pContext->ahandle = ahandle;
...@@ -153,17 +163,19 @@ void cqStop(void *handle) { ...@@ -153,17 +163,19 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) { void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) {
SCqContext *pContext = handle; SCqContext *pContext = handle;
SCqObj *pObj = calloc(sizeof(SCqObj), 1); SCqObj *pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL; if (pObj == NULL) return NULL;
pObj->uid = uid;
pObj->tid = tid; pObj->tid = tid;
pObj->sqlStr = malloc(strlen(sqlStr)+1); pObj->sqlStr = malloc(strlen(sqlStr)+1);
strcpy(pObj->sqlStr, sqlStr); strcpy(pObj->sqlStr, sqlStr);
pObj->pSchema = tdDupSchema(pSchema); pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = schemaTLen(pSchema);
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
...@@ -207,16 +219,16 @@ void cqDrop(void *handle) { ...@@ -207,16 +219,16 @@ void cqDrop(void *handle) {
} }
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0); pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) { if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
return;
} }
return;
} }
int64_t lastKey = 0; int64_t lastKey = 0;
pObj->pContext = pContext;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pObj->pStream) { if (pObj->pStream) {
pContext->num++; pContext->num++;
...@@ -229,29 +241,49 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -229,29 +241,49 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param; SCqObj *pObj = (SCqObj *)param;
SCqContext *pContext = pObj->pContext; SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return; if (pObj->pStream == NULL) return;
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
// construct data int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
char *buffer = calloc(size, 1); char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer; SWalHead *pHead = (SWalHead *)buffer;
pHead->msgType = TSDB_MSG_TYPE_SUBMIT; SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
pHead->len = size - sizeof(SWalHead); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
// to do: fill in the SSubmitMsg structure
pSubmit->numOfBlocks = 1;
SDataRow trow = (SDataRow)pBlk->data;
tdInitDataRow(trow, pSchema);
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); for (int32_t i = 0; i < pSchema->numOfCols; i++) {
// to do: fill in the SSubmitBlk strucuture STColumn *c = pSchema->columns + i;
pBlk->tid = pObj->tid; char* val = (char*)row[i];
if (IS_VAR_DATA_TYPE(c->type)) {
val -= sizeof(VarDataLenT);
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
pBlk->len = htonl(dataRowLen(trow));
pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid);
pBlk->numOfRows = htons(1);
pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow);
pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(pHead->len);
pMsg->length = pMsg->header.contLen;
pMsg->numOfBlocks = htonl(1);
pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
pHead->version = 0;
// write into vnode write queue // write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
free(buffer);
} }
...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) { ...@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
} }
tdFreeSchema(pSchema); tdFreeSchema(pSchema);
......
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeStartModules(); void dnodeStartModules();
void dnodeStartStream();
void dnodeCleanupModules(); void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus); void dnodeProcessModuleStatus(uint32_t moduleStatus);
......
...@@ -124,6 +124,7 @@ int32_t dnodeInitSystem() { ...@@ -124,6 +124,7 @@ int32_t dnodeInitSystem() {
dnodeStartModules(); dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dnodeStartStream();
dPrint("TDengine is initialized successfully"); dPrint("TDengine is initialized successfully");
......
...@@ -260,11 +260,27 @@ static int32_t dnodeOpenVnodes() { ...@@ -260,11 +260,27 @@ static int32_t dnodeOpenVnodes() {
} }
free(vnodeList); free(vnodeList);
dPrint("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed); dPrint("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES];
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dPrint("Get dnode list failed");
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeStartStream(vnodeList[i]);
}
dPrint("streams started");
}
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes; int32_t numOfVnodes;
......
...@@ -27,6 +27,7 @@ typedef struct { ...@@ -27,6 +27,7 @@ typedef struct {
int vgId; int vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN + 1];
FCqWrite cqWrite; FCqWrite cqWrite;
} SCqCfg; } SCqCfg;
...@@ -41,7 +42,7 @@ void cqStart(void *handle); ...@@ -41,7 +42,7 @@ void cqStart(void *handle);
void cqStop(void *handle); void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ // cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema); void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle); void cqDrop(void *handle);
......
...@@ -43,7 +43,7 @@ typedef struct { ...@@ -43,7 +43,7 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status); int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
void *(*configFunc)(int32_t vgId, int32_t sid); void *(*configFunc)(int32_t vgId, int32_t sid);
} STsdbAppH; } STsdbAppH;
...@@ -118,6 +118,7 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); ...@@ -118,6 +118,7 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
void tsdbStartStream(TsdbRepoT *repo);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
......
...@@ -38,6 +38,7 @@ typedef struct { ...@@ -38,6 +38,7 @@ typedef struct {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeStartStream(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
......
...@@ -473,6 +473,18 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) { ...@@ -473,6 +473,18 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable); return TSDB_GET_TABLE_LAST_KEY(pTable);
} }
void tsdbStartStream(TsdbRepoT *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
}
STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
// TODO // TODO
return NULL; return NULL;
......
...@@ -150,7 +150,6 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -150,7 +150,6 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
void tsdbOrgMeta(void *pHandle) { void tsdbOrgMeta(void *pHandle) {
STsdbMeta *pMeta = (STsdbMeta *)pHandle; STsdbMeta *pMeta = (STsdbMeta *)pHandle;
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
for (int i = 1; i < pMeta->maxTables; i++) { for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
...@@ -158,13 +157,6 @@ void tsdbOrgMeta(void *pHandle) { ...@@ -158,13 +157,6 @@ void tsdbOrgMeta(void *pHandle) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
} }
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
} }
/** /**
...@@ -683,7 +675,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -683,7 +675,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
if (pTable->type == TSDB_STREAM_TABLE && addIdx) { if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
} }
pMeta->nTables++; pMeta->nTables++;
......
...@@ -208,8 +208,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -208,8 +208,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
SCqCfg cqCfg = {0}; SCqCfg cqCfg = {0};
sprintf(cqCfg.user, "root"); sprintf(cqCfg.user, "_root");
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue; cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
...@@ -277,6 +278,15 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -277,6 +278,15 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeStartStream(int32_t vnode) {
SVnodeObj* pVnode = vnodeAccquireVnode(vnode);
if (pVnode != NULL) {
tsdbStartStream(pVnode->tsdb);
vnodeRelease(pVnode);
}
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) return 0; if (ppVnode == NULL || *ppVnode == NULL) return 0;
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tbNum = 10
rowNum = 20
tdSql.prepare()
tdLog.info("===== step1 =====")
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
for i in range(tbNum):
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
for j in range(rowNum):
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
time.sleep(0.1)
tdLog.info("===== step2 =====")
tdSql.query("select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step3 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step4 =====")
tdSql.execute("drop table s0")
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdLog.info("===== step5 =====")
tdSql.error("select * from s0")
tdLog.info("===== step6 =====")
time.sleep(0.1)
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step7 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step8 =====")
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.checkData(0, 1, rowNum * tbNum)
tdSql.checkData(0, 2, rowNum * tbNum)
tdSql.checkData(0, 3, rowNum * tbNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step9 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, rowNum * tbNum)
tdSql.checkData(0, 2, rowNum * tbNum)
tdSql.checkData(0, 3, rowNum * tbNum)
tdLog.info("===== step10 =====")
tdSql.execute("drop table s1")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step11 =====")
tdSql.error("select * from s1")
tdLog.info("===== step12 =====")
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step13 =====")
tdLog.info("sleeping 120 seconds")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, rowNum * tbNum)
tdSql.checkData(0, 2, rowNum * tbNum)
tdSql.checkData(0, 3, rowNum * tbNum)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tbNum = 10
rowNum = 20
totalNum = tbNum * rowNum
tdSql.prepare()
tdLog.info("===== step1 =====")
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
for i in range(tbNum):
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
for j in range(rowNum):
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
time.sleep(0.1)
tdLog.info("===== step2 =====")
tdSql.query("select count(col1) from tb0 interval(1d)")
tdSql.checkData(0, 1, rowNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdSql.execute("create table s0 as select count(col1) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step3 =====")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdLog.info("===== step4 =====")
tdSql.execute("drop table s0")
tdSql.query("show tables")
tdSql.checkRows(tbNum)
tdLog.info("===== step5 =====")
tdSql.error("select * from s0")
tdLog.info("===== step6 =====")
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step7 =====")
time.sleep(120)
tdSql.query("select * from s0")
tdSql.checkData(0, 1, rowNum)
tdSql.checkData(0, 2, rowNum)
tdSql.checkData(0, 3, rowNum)
tdLog.info("===== step8 =====")
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.checkData(0, 1, totalNum)
tdSql.checkData(0, 2, totalNum)
tdSql.checkData(0, 3, totalNum)
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step9 =====")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, totalNum)
tdSql.checkData(0, 2, totalNum)
tdSql.checkData(0, 3, totalNum)
tdLog.info("===== step10 =====")
tdSql.execute("drop table s1")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 1)
tdLog.info("===== step11 =====")
tdSql.error("select * from s1")
tdLog.info("===== step12 =====")
tdSql.execute("create table s1 as select count(col1) from stb0 interval(1d)")
tdSql.query("show tables")
tdSql.checkRows(tbNum + 2)
tdLog.info("===== step13 =====")
time.sleep(120)
tdSql.query("select * from s1")
tdSql.checkData(0, 1, totalNum)
#tdSql.checkData(0, 2, None)
#tdSql.checkData(0, 3, None)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -205,8 +205,8 @@ if $data01 != 20 then ...@@ -205,8 +205,8 @@ if $data01 != 20 then
endi endi
print =============== step21 print =============== step21
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step22 print =============== step22
$st = $stPrefix . c1 $st = $stPrefix . c1
......
...@@ -76,20 +76,20 @@ endw ...@@ -76,20 +76,20 @@ endw
sql drop table $mt sql drop table $mt
print =============== step4 print =============== step4
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step5 print =============== step5
$st = $stPrefix . c3 $st = $stPrefix . c3
sql select * from $st sql select * from $st
print ===> select * from $st print ===> select * from $st
print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $data01 != NULL then if $data01 != null then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi
...@@ -187,8 +187,8 @@ $st = $stPrefix . as ...@@ -187,8 +187,8 @@ $st = $stPrefix . as
#sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7, avg(tbcol) as a8, sum(tbcol) as a9, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7 from $mt where ts < now + 4m interval(1d) #sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7, avg(tbcol) as a8, sum(tbcol) as a9, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, count(tbcol) as a7 from $mt where ts < now + 4m interval(1d)
print =============== step9 print =============== step9
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step10 print =============== step10
$st = $stPrefix . c3 $st = $stPrefix . c3
......
...@@ -163,8 +163,8 @@ $st = $stPrefix . as ...@@ -163,8 +163,8 @@ $st = $stPrefix . as
sql create table $st as select count(tbcol) as c from $mt interval(1d) sql create table $st as select count(tbcol) as c from $mt interval(1d)
print =============== step13 print =============== step13
print sleep 22 seconds print sleep 120 seconds
sleep 32000 sleep 120000
print =============== step14 print =============== step14
$st = $stPrefix . c1 $st = $stPrefix . c1
......
system sh/stop_dnodes.sh #system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 #system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0 #system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 10 #system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 10
system sh/exec.sh -n dnode1 -s start #system sh/exec.sh -n dnode1 -s start
sleep 3000 #sleep 3000
sql connect sql connect
print ======================== dnode1 start print ======================== dnode1 start
...@@ -56,14 +56,14 @@ print $data00 $data01 $data02 $data03 ...@@ -56,14 +56,14 @@ print $data00 $data01 $data02 $data03
sql create table $st as select count(*), count(tbcol), count(tbcol2) from $mt interval(10s) sql create table $st as select count(*), count(tbcol), count(tbcol2) from $mt interval(10s)
print =============== step3 print =============== step3
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step4 print =============== step4
sql select * from $st sql select * from $st
print $st ==> $rows1 $data00 $data01 $data02 $data03 print $st ==> $rows1 $data00 $data01 $data02 $data03
if $data13 >= 51 then if $data03 >= 51 then
return -1 return -1
endi endi
...@@ -90,8 +90,8 @@ while $i < $tbNum ...@@ -90,8 +90,8 @@ while $i < $tbNum
endw endw
print =============== step6 print =============== step6
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step7 print =============== step7
......
...@@ -73,8 +73,8 @@ print =============== step3 ...@@ -73,8 +73,8 @@ print =============== step3
sql create table $stt as select count(*) from $tb interval(1d) sql create table $stt as select count(*) from $tb interval(1d)
sql create table $stm as select count(*) from $mt interval(1d) sql create table $stm as select count(*) from $mt interval(1d)
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $stt sql select * from $stt
print select count(*) from $stt ===> $data00 $data01 print select count(*) from $stt ===> $data00 $data01
...@@ -152,8 +152,8 @@ print =============== step8 ...@@ -152,8 +152,8 @@ print =============== step8
sql create table $stt as select count(*) from $tb interval(1d) sql create table $stt as select count(*) from $tb interval(1d)
sql create table $stm as select count(*) from $mt interval(1d) sql create table $stm as select count(*) from $mt interval(1d)
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $stt sql select * from $stt
sleep 1000 sleep 1000
......
...@@ -78,8 +78,8 @@ if $rows != 11 then ...@@ -78,8 +78,8 @@ if $rows != 11 then
endi endi
print =============== step3 print =============== step3
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
...@@ -112,8 +112,8 @@ if $rows != 11 then ...@@ -112,8 +112,8 @@ if $rows != 11 then
endi endi
print =============== step7 print =============== step7
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
...@@ -155,8 +155,8 @@ if $rows != 12 then ...@@ -155,8 +155,8 @@ if $rows != 12 then
endi endi
print =============== step9 print =============== step9
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 200 then if $data01 != 200 then
...@@ -190,8 +190,8 @@ if $rows != 12 then ...@@ -190,8 +190,8 @@ if $rows != 12 then
endi endi
print =============== step13 print =============== step13
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 200 then if $data01 != 200 then
......
...@@ -72,8 +72,8 @@ if $rows != 11 then ...@@ -72,8 +72,8 @@ if $rows != 11 then
endi endi
print =============== step3 print =============== step3
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
...@@ -100,8 +100,8 @@ if $rows != 11 then ...@@ -100,8 +100,8 @@ if $rows != 11 then
endi endi
print =============== step7 print =============== step7
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 print select * from $st => $data01
if $data01 != 20 then if $data01 != 20 then
...@@ -143,8 +143,8 @@ if $rows != 12 then ...@@ -143,8 +143,8 @@ if $rows != 12 then
endi endi
print =============== step9 print =============== step9
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 $data02, $data03 print select * from $st => $data01 $data02, $data03
if $data01 != 200 then if $data01 != 200 then
...@@ -178,17 +178,17 @@ if $rows != 12 then ...@@ -178,17 +178,17 @@ if $rows != 12 then
endi endi
print =============== step13 print =============== step13
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
sql select * from $st sql select * from $st
print select * from $st => $data01 $data02, $data03 print select * from $st => $data01 $data02, $data03
if $data01 != 200 then if $data01 != 200 then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi
...@@ -79,8 +79,8 @@ $st = $stPrefix . c3 ...@@ -79,8 +79,8 @@ $st = $stPrefix . c3
sql create table $st as select count(tbcol2) from $tb interval(1d) sql create table $st as select count(tbcol2) from $tb interval(1d)
print =============== step5 print =============== step5
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step6 print =============== step6
$st = $stPrefix . c1 $st = $stPrefix . c1
...@@ -173,8 +173,8 @@ $st = $stPrefix . c3 ...@@ -173,8 +173,8 @@ $st = $stPrefix . c3
sql create table $st as select count(*), count(tbcol), count(tbcol2) from $tb interval(1d) sql create table $st as select count(*), count(tbcol), count(tbcol2) from $tb interval(1d)
print =============== step10 print =============== step10
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step11 print =============== step11
#$st = $stPrefix . c3 #$st = $stPrefix . c3
......
...@@ -79,8 +79,8 @@ sleep 1000 ...@@ -79,8 +79,8 @@ sleep 1000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
print =============== step4 print =============== step4
print sleep 23 seconds print sleep 120 seconds
sleep 23000 sleep 120000
print =============== step5 print =============== step5
$i = 1 $i = 1
......
...@@ -214,8 +214,8 @@ sql select count(tbcol) from $tb where ts < now + 4m interval(1d) group by tgcol ...@@ -214,8 +214,8 @@ sql select count(tbcol) from $tb where ts < now + 4m interval(1d) group by tgcol
step20: step20:
print =============== step21 print =============== step21
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step22 print =============== step22
$st = $stPrefix . c1 $st = $stPrefix . c1
......
...@@ -71,20 +71,20 @@ print =============== step3 ...@@ -71,20 +71,20 @@ print =============== step3
sql drop table $tb sql drop table $tb
print =============== step4 print =============== step4
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step5 print =============== step5
$st = $stPrefix . c3 $st = $stPrefix . c3
sql select * from $st sql select * from $st
print ===> select * from $st print ===> select * from $st
print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print ===> $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $data01 != NULL then if $data01 != null then
return -1 return -1
endi endi
if $data02 != NULL then if $data02 != null then
return -1 return -1
endi endi
if $data03 != NULL then if $data03 != null then
return -1 return -1
endi endi
...@@ -191,8 +191,8 @@ $st = $stPrefix . as ...@@ -191,8 +191,8 @@ $st = $stPrefix . as
#sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, stddev(tbcol) as a7, percentile(tbcol, 1) as a8, count(tbcol) as a9, leastsquares(tbcol, 1, 1) as a10 from $tb where ts < now + 4m interval(1d) #sql create table $st as select avg(tbcol) as a1, sum(tbcol) as a2, min(tbcol) as a3, max(tbcol) as a4, first(tbcol) as a5, last(tbcol) as a6, stddev(tbcol) as a7, percentile(tbcol, 1) as a8, count(tbcol) as a9, leastsquares(tbcol, 1, 1) as a10 from $tb where ts < now + 4m interval(1d)
print =============== step10 print =============== step10
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step11 print =============== step11
$st = $stPrefix . c3 $st = $stPrefix . c3
......
...@@ -196,8 +196,8 @@ $st = $stPrefix . as ...@@ -196,8 +196,8 @@ $st = $stPrefix . as
sql create table $st as select count(tbcol) as c from $tb interval(1d) sql create table $st as select count(tbcol) as c from $tb interval(1d)
print =============== step16 print =============== step16
print sleep 22 seconds print sleep 120 seconds
sleep 22000 sleep 120000
print =============== step17 print =============== step17
$st = $stPrefix . c1 $st = $stPrefix . c1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册