提交 3addb954 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'new' into dd

...@@ -17,11 +17,12 @@ include(${TD_SUPPORT_DIR}/cmake.platform) ...@@ -17,11 +17,12 @@ include(${TD_SUPPORT_DIR}/cmake.platform)
include(${TD_SUPPORT_DIR}/cmake.define) include(${TD_SUPPORT_DIR}/cmake.define)
include(${TD_SUPPORT_DIR}/cmake.options) include(${TD_SUPPORT_DIR}/cmake.options)
include(${TD_SUPPORT_DIR}/cmake.version) include(${TD_SUPPORT_DIR}/cmake.version)
include(${TD_SUPPORT_DIR}/cmake.install)
# contrib # contrib
add_subdirectory(contrib) add_subdirectory(contrib)
set_property(GLOBAL PROPERTY GLOBAL_DEPENDS_NO_CYCLES OFF)
# api # api
add_library(api INTERFACE) add_library(api INTERFACE)
target_include_directories(api INTERFACE "include/client") target_include_directories(api INTERFACE "include/client")
...@@ -36,8 +37,7 @@ add_subdirectory(source) ...@@ -36,8 +37,7 @@ add_subdirectory(source)
add_subdirectory(tools) add_subdirectory(tools)
add_subdirectory(utils) add_subdirectory(utils)
add_subdirectory(examples/c) add_subdirectory(examples/c)
include(${TD_SUPPORT_DIR}/cmake.install)
# docs # docs
add_subdirectory(docs/doxgen) add_subdirectory(docs/doxgen)
\ No newline at end of file
# tests (TODO)
...@@ -5,7 +5,7 @@ toc_max_heading_level: 4 ...@@ -5,7 +5,7 @@ toc_max_heading_level: 4
description: "taosBenchmark (once called taosdemo ) is a tool for testing the performance of TDengine." description: "taosBenchmark (once called taosdemo ) is a tool for testing the performance of TDengine."
--- ---
## Introduction # Introduction
taosBenchmark (formerly taosdemo ) is a tool for testing the performance of TDengine products. taosBenchmark can test the performance of TDengine's insert, query, and subscription functions and simulate large amounts of data generated by many devices. taosBenchmark can be configured to generate user defined databases, supertables, subtables, and the time series data to populate these for performance benchmarking. taosBenchmark is highly configurable and some of the configurations include the time interval for inserting data, the number of working threads and the capability to insert disordered data. The installer provides taosdemo as a soft link to taosBenchmark for compatibility with past users. taosBenchmark (formerly taosdemo ) is a tool for testing the performance of TDengine products. taosBenchmark can test the performance of TDengine's insert, query, and subscription functions and simulate large amounts of data generated by many devices. taosBenchmark can be configured to generate user defined databases, supertables, subtables, and the time series data to populate these for performance benchmarking. taosBenchmark is highly configurable and some of the configurations include the time interval for inserting data, the number of working threads and the capability to insert disordered data. The installer provides taosdemo as a soft link to taosBenchmark for compatibility with past users.
...@@ -334,9 +334,9 @@ The configuration parameters for specifying super table tag columns and data col ...@@ -334,9 +334,9 @@ The configuration parameters for specifying super table tag columns and data col
- **name** : The name of the column, if used together with count, e.g. "name": "current", "count":3, then the names of the 3 columns are current, current_2. current_3. - **name** : The name of the column, if used together with count, e.g. "name": "current", "count":3, then the names of the 3 columns are current, current_2. current_3.
- **min**: The minimum value of the column/label of the data type. - **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value.
- **max**: The maximum value of the column/label of the data type. - **max**: The maximum value of the column/label of the data type. The generated value will less than the maxium value.
- **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values. - **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values.
......
...@@ -334,9 +334,9 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\) ...@@ -334,9 +334,9 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
- **name** : 列的名字,若与 count 同时使用,比如 "name":"current", "count":3, 则 3 个列的名字分别为 current, current_2. current_3。 - **name** : 列的名字,若与 count 同时使用,比如 "name":"current", "count":3, 则 3 个列的名字分别为 current, current_2. current_3。
- **min** : 数据类型的 列/标签 的最小值。 - **min** : 数据类型的 列/标签 的最小值。生成的值将大于或等于最小值。
- **max** : 数据类型的 列/标签 的最大值。 - **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。
- **values** : nchar/binary 列/标签的值域,将从值中随机选择。 - **values** : nchar/binary 列/标签的值域,将从值中随机选择。
......
...@@ -49,28 +49,28 @@ int32_t init_env() { ...@@ -49,28 +49,28 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int, j varchar(20)) tags(a varchar(20))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags('c1')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags('c2')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags(3)"); pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags('c3')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -96,7 +96,8 @@ int32_t create_stream() { ...@@ -96,7 +96,8 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, pRes = taos_query(pConn,
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)"); /*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/
"create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -117,6 +117,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { ...@@ -117,6 +117,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
mndTransDrop(pTrans); mndTransDrop(pTrans);
tDeleteSMqConsumerObj(pConsumerNew);
return 0; return 0;
FAIL: FAIL:
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
......
...@@ -239,7 +239,9 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { ...@@ -239,7 +239,9 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes; pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
} }
} }
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); if (pObj->pDnode != NULL) {
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
}
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
} }
......
...@@ -182,6 +182,11 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { ...@@ -182,6 +182,11 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset");
if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDecoderClear(&decoder);
return -1;
}
for (int32_t i = 0; i < commitOffsetReq.num; i++) { for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i]; SMqOffset *pOffset = &commitOffsetReq.offsets[i];
......
...@@ -222,6 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -222,6 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
taosMemoryFree(buf);
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
taosMemoryFree(buf); taosMemoryFree(buf);
......
...@@ -35,7 +35,11 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -35,7 +35,11 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
*ppMeta = NULL; *ppMeta = NULL;
// create handle // create handle
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_META_DIR) + 3; if (pVnode->pTfs) {
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_META_DIR) + 3;
} else {
slen = strlen(pVnode->path) + strlen(VNODE_META_DIR) + 2;
}
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + slen)) == NULL) { if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + slen)) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -43,8 +47,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -43,8 +47,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
metaInitLock(pMeta); metaInitLock(pMeta);
pMeta->path = (char *)&pMeta[1]; pMeta->path = (char *)&pMeta[1];
sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, if (pVnode->pTfs) {
VNODE_META_DIR); sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VNODE_META_DIR);
} else {
sprintf(pMeta->path, "%s%s%s", pVnode->path, TD_DIRSEP, VNODE_META_DIR);
}
taosRealPath(pMeta->path, NULL, slen); taosRealPath(pMeta->path, NULL, slen);
pMeta->pVnode = pVnode; pMeta->pVnode = pVnode;
......
...@@ -170,6 +170,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { ...@@ -170,6 +170,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdbFree(pKey);
tdbTbcClose(pCur); tdbTbcClose(pCur);
return -1; return -1;
} }
......
...@@ -417,6 +417,7 @@ _err: ...@@ -417,6 +417,7 @@ _err:
// EXPOSED APIS ==================================================================================== // EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t tsdbFSOpen(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pTsdb->pVnode;
// open handle // open handle
pTsdb->fs.pDelFile = NULL; pTsdb->fs.pDelFile = NULL;
...@@ -429,8 +430,12 @@ int32_t tsdbFSOpen(STsdb *pTsdb) { ...@@ -429,8 +430,12 @@ int32_t tsdbFSOpen(STsdb *pTsdb) {
// load fs or keep empty // load fs or keep empty
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, if (pVnode->pTfs) {
pTsdb->path, TD_DIRSEP); snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP);
} else {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
}
if (!taosCheckExistFile(fname)) { if (!taosCheckExistFile(fname)) {
// empty one // empty one
......
...@@ -57,10 +57,13 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee ...@@ -57,10 +57,13 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
} else { } else {
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg)); memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
} }
// pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir // create dir
tfsMkdir(pVnode->pTfs, pTsdb->path); if (pVnode->pTfs) {
tfsMkdir(pVnode->pTfs, pTsdb->path);
} else {
taosMkDir(pTsdb->path);
}
// open tsdb // open tsdb
if (tsdbFSOpen(pTsdb) < 0) { if (tsdbFSOpen(pTsdb) < 0) {
......
...@@ -227,7 +227,11 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -227,7 +227,11 @@ int vnodeCommit(SVnode *pVnode) {
info.state.committed = pVnode->state.applied; info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm; info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID; info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeSaveInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
......
...@@ -138,12 +138,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -138,12 +138,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
taosRealPath(tdir, NULL, sizeof(tdir)); taosRealPath(tdir, NULL, sizeof(tdir));
// for test tsdb snapshot
#if 0
pVnode->config.walCfg.segSize = 200;
pVnode->config.walCfg.retentionSize = 2000;
#endif
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
...@@ -159,12 +153,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -159,12 +153,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
#if !VNODE_AS_LIB
// open query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
#endif
// vnode begin // vnode begin
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
...@@ -173,11 +169,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -173,11 +169,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
#if !VNODE_AS_LIB
// open sync // open sync
if (vnodeSyncOpen(pVnode, dir)) { if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
#endif
return pVnode; return pVnode;
......
...@@ -819,11 +819,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -819,11 +819,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
int32_t tsize, ret; int32_t tsize, ret;
SEncoder encoder = {0}; SEncoder encoder = {0};
SArray *newTbUids = NULL; SArray *newTbUids = NULL;
SVStatis statis = {0};
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0; pRsp->code = 0;
pSubmitReq->version = version; pSubmitReq->version = version;
atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1); statis.nBatchInsert = 1;
#ifdef TD_DEBUG_PRINT_ROW #ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
...@@ -943,18 +944,21 @@ _exit: ...@@ -943,18 +944,21 @@ _exit:
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
// TODO: the partial success scenario and the error case // TODO: the partial success scenario and the error case
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
// 1/level 2. // 1/level 2.
// TODO: refactor // TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); statis.nBatchInsertSuccess = 1;
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
} }
// N.B. not strict as the following procedure is not atomic
atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
return 0; return 0;
} }
......
...@@ -474,6 +474,10 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -474,6 +474,10 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) {
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) { if (code != 0) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
add_subdirectory(tdb)
add_subdirectory(cache)
add_subdirectory(transport) add_subdirectory(transport)
add_subdirectory(wal)
add_subdirectory(monitor)
add_subdirectory(tfs)
add_subdirectory(sync) add_subdirectory(sync)
add_subdirectory(tdb) add_subdirectory(qcom)
add_subdirectory(nodes)
add_subdirectory(catalog)
add_subdirectory(scalar)
add_subdirectory(function)
add_subdirectory(index) add_subdirectory(index)
add_subdirectory(wal)
add_subdirectory(parser) add_subdirectory(parser)
add_subdirectory(scheduler) add_subdirectory(scheduler)
add_subdirectory(cache)
add_subdirectory(catalog)
add_subdirectory(executor) add_subdirectory(executor)
add_subdirectory(stream) add_subdirectory(stream)
add_subdirectory(planner) add_subdirectory(planner)
add_subdirectory(function)
add_subdirectory(qcom)
add_subdirectory(qworker) add_subdirectory(qworker)
add_subdirectory(tfs)
add_subdirectory(monitor)
add_subdirectory(nodes)
add_subdirectory(scalar)
add_subdirectory(command) add_subdirectory(command)
\ No newline at end of file
...@@ -1341,7 +1341,8 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { ...@@ -1341,7 +1341,8 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
void* pData = colDataGetData(pCol, 0); void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation // TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) { if (pData != (void*)-1 && pData != NULL) {
memcpy(pBlock->info.parTbName, varDataVal(pData), varDataLen(pData)); memcpy(pBlock->info.parTbName, varDataVal(pData), TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN));
pBlock->info.parTbName[TSDB_TABLE_NAME_LEN - 1] = 0;
} else { } else {
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
} }
......
...@@ -3611,6 +3611,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { ...@@ -3611,6 +3611,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pUpdateRes); blockDataDestroy(pInfo->pUpdateRes);
destroySqlFunctionCtx(pInfo->pDummyCtx, 0); destroySqlFunctionCtx(pInfo->pDummyCtx, 0);
taosHashCleanup(pInfo->pStDeleted); taosHashCleanup(pInfo->pStDeleted);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -4670,6 +4671,7 @@ void destroyStreamStateOperatorInfo(void* param) { ...@@ -4670,6 +4671,7 @@ void destroyStreamStateOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
taosHashCleanup(pInfo->pSeDeleted); taosHashCleanup(pInfo->pSeDeleted);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
destroySqlFunctionCtx(pInfo->pDummyCtx, 0); destroySqlFunctionCtx(pInfo->pDummyCtx, 0);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
......
...@@ -14,7 +14,14 @@ target_include_directories( ...@@ -14,7 +14,14 @@ target_include_directories(
target_link_libraries( target_link_libraries(
function function
PRIVATE os util common nodes scalar qcom transport stream PRIVATE os
PRIVATE util
PRIVATE common
PRIVATE nodes
PRIVATE qcom
PRIVATE scalar
PRIVATE transport
PRIVATE stream
PUBLIC uv_a PUBLIC uv_a
) )
......
...@@ -2914,6 +2914,7 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* ...@@ -2914,6 +2914,7 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode*
} else { } else {
intervalRange = pInterval->datum.i; intervalRange = pInterval->datum.i;
} }
if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
} }
...@@ -3140,6 +3141,12 @@ static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) { ...@@ -3140,6 +3141,12 @@ static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) {
code = translateExpr(pCxt, pEvery); code = translateExpr(pCxt, pEvery);
} }
int64_t interval = ((SValueNode*)(*pEvery))->datum.i;
if (interval == 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE,
"Unsupported time unit in EVERY clause");
}
return code; return code;
} }
......
...@@ -8,7 +8,7 @@ target_include_directories( ...@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
qcom qcom
PRIVATE os util transport nodes PRIVATE os util transport
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -8,8 +8,14 @@ target_include_directories( ...@@ -8,8 +8,14 @@ target_include_directories(
) )
target_link_libraries(scalar target_link_libraries(scalar
PRIVATE os util common nodes function qcom vnode PRIVATE os
) PRIVATE util
PRIVATE common
PRIVATE nodes
PRIVATE function
PRIVATE qcom
PRIVATE vnode
)
if(${BUILD_TEST}) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
......
...@@ -73,7 +73,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { ...@@ -73,7 +73,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
} }
// todo refactor // todo refactor
if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
goto _err; goto _err;
} }
......
...@@ -16,11 +16,6 @@ target_link_libraries( ...@@ -16,11 +16,6 @@ target_link_libraries(
) )
if (${BUILD_WITH_UV_TRANS}) if (${BUILD_WITH_UV_TRANS})
if (${BUILD_WITH_UV}) if (${BUILD_WITH_UV})
target_include_directories(
transport
PUBLIC "${TD_SOURCE_DIR}/contrib/libuv/include"
)
target_link_libraries( target_link_libraries(
transport transport
PUBLIC uv_a PUBLIC uv_a
......
...@@ -35,8 +35,8 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe ...@@ -35,8 +35,8 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { static FORCE_INLINE void walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
} }
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
...@@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const char* idxPattern = "^[0-9]+.idx$"; const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern; regex_t logRegPattern;
regex_t idxRegPattern; regex_t idxRegPattern;
bool fixed = false;
regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
...@@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) {
actualFileNum = taosArrayGetSize(pLogInfoArray); actualFileNum = taosArrayGetSize(pLogInfoArray);
#endif #endif
{
int32_t i = 0, j = 0;
while (i < actualFileNum && j < metaFileNum) {
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j);
if (pActualFile->firstVer < pMetaFile->firstVer) {
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
i++;
} else if (pActualFile->firstVer > pMetaFile->firstVer) {
taosArrayRemove(pWal->fileInfoSet, j);
metaFileNum--;
} else {
i++;
j++;
}
}
if (i == actualFileNum && j == metaFileNum) {
if (j > 0) {
SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1);
int64_t fsize = 0;
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastInfo->firstVer, fNameStr);
taosStatFile(fNameStr, &fsize, NULL);
if (pLastInfo->fileSize != fsize) {
fixed = true;
pLastInfo->fileSize = fsize;
pLastInfo->lastVer = walScanLogGetLastVer(pWal);
}
}
} else {
fixed = true;
while (i < actualFileNum) {
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
taosStatFile(fNameStr, &pActualFile->fileSize, NULL);
if (pActualFile->fileSize == 0) {
ASSERT(i == actualFileNum - 1);
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
break;
}
if (i < actualFileNum - 1) {
pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1;
taosArrayPush(pWal->fileInfoSet, pActualFile);
i++;
} else {
pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile);
pActualFile->lastVer = walScanLogGetLastVer(pWal);
if (pActualFile->lastVer == -1) {
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
taosArrayPop(pWal->fileInfoSet);
}
break;
}
}
}
}
#if 0
if (metaFileNum > actualFileNum) { if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) { } else if (metaFileNum < actualFileNum) {
...@@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
} }
#endif
taosArrayDestroy(actualLog); taosArrayDestroy(actualLog);
actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
pWal->writeCur = actualFileNum - 1; pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
/*ASSERT(fileSize != 0);*/
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = fileSize;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
int code = walSaveMeta(pWal); if (actualFileNum > 0) {
if (code < 0) { int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer;
return -1; if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) {
} fixed = true;
pWal->vers.lastVer = fLastVer;
}
int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
if (fFirstVer != pWal->vers.firstVer) {
fixed = true;
pWal->vers.firstVer = fFirstVer;
} }
} }
if (fixed) {
walSaveMeta(pWal);
}
return 0; return 0;
} }
...@@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) { ...@@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) {
// read metafile // read metafile
int64_t fileSize = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL); taosStatFile(fnameStr, &fileSize, NULL);
if (fileSize == 0) {
taosRemoveFile(fnameStr);
wDebug("vgId:%d wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
return -1;
}
int size = (int)fileSize; int size = (int)fileSize;
char* buf = taosMemoryMalloc(size + 5); char* buf = taosMemoryMalloc(size + 5);
if (buf == NULL) { if (buf == NULL) {
...@@ -540,6 +615,7 @@ int walLoadMeta(SWal* pWal) { ...@@ -540,6 +615,7 @@ int walLoadMeta(SWal* pWal) {
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosMemoryFree(buf);
return -1; return -1;
} }
if (taosReadFile(pFile, buf, size) != size) { if (taosReadFile(pFile, buf, size) != size) {
......
...@@ -279,7 +279,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -279,7 +279,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pRead->pHead = ptr; pRead->pHead = (SWalCkHead *)ptr;
pReadHead = &pRead->pHead->head; pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen; pRead->capacity = pReadHead->bodyLen;
} }
...@@ -399,7 +399,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -399,7 +399,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
*ppHead = ptr; *ppHead = (SWalCkHead *)ptr;
pReadHead = &((*ppHead)->head); pReadHead = &((*ppHead)->head);
pRead->capacity = pReadHead->bodyLen; pRead->capacity = pReadHead->bodyLen;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册