提交 cd372283 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into fix/3.0_bugfix_wxy

......@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 0d5663d
GIT_TAG ff7de07
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
<svg xmlns="http://www.w3.org/2000/svg" viewBox="-0.5 -1 32 32">
<g fill="#5865f2">
<path
d="M26.0015 6.9529C24.0021 6.03845 21.8787 5.37198 19.6623 5C19.3833 5.48048 19.0733 6.13144 18.8563 6.64292C16.4989 6.30193 14.1585 6.30193 11.8336 6.64292C11.6166 6.13144 11.2911 5.48048 11.0276 5C8.79575 5.37198 6.67235 6.03845 4.6869 6.9529C0.672601 12.8736 -0.41235 18.6548 0.130124 24.3585C2.79599 26.2959 5.36889 27.4739 7.89682 28.2489C8.51679 27.4119 9.07477 26.5129 9.55525 25.5675C8.64079 25.2265 7.77283 24.808 6.93587 24.312C7.15286 24.1571 7.36986 23.9866 7.57135 23.8161C12.6241 26.1255 18.0969 26.1255 23.0876 23.8161C23.3046 23.9866 23.5061 24.1571 23.7231 24.312C22.8861 24.808 22.0182 25.2265 21.1037 25.5675C21.5842 26.5129 22.1422 27.4119 22.7621 28.2489C25.2885 27.4739 27.8769 26.2959 30.5288 24.3585C31.1952 17.7559 29.4733 12.0212 26.0015 6.9529ZM10.2527 20.8402C8.73376 20.8402 7.49382 19.4608 7.49382 17.7714C7.49382 16.082 8.70276 14.7025 10.2527 14.7025C11.7871 14.7025 13.0425 16.082 13.0115 17.7714C13.0115 19.4608 11.7871 20.8402 10.2527 20.8402ZM20.4373 20.8402C18.9183 20.8402 17.6768 19.4608 17.6768 17.7714C17.6768 16.082 18.8873 14.7025 20.4373 14.7025C21.9717 14.7025 23.2271 16.082 23.1961 17.7714C23.1961 19.4608 21.9872 20.8402 20.4373 20.8402Z"
></path>
</g>
</svg>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="-1 -2 18 18">
<path
fill="currentColor"
d="M8 0C3.58 0 0 3.58 0 8c0 3.54 2.29 6.53 5.47 7.59.4.07.55-.17.55-.38 0-.19-.01-.82-.01-1.49-2.01.37-2.53-.49-2.69-.94-.09-.23-.48-.94-.82-1.13-.28-.15-.68-.52-.01-.53.63-.01 1.08.58 1.23.82.72 1.21 1.87.87 2.33.66.07-.52.28-.87.51-1.07-1.78-.2-3.64-.89-3.64-3.95 0-.87.31-1.59.82-2.15-.08-.2-.36-1.02.08-2.12 0 0 .67-.21 2.2.82.64-.18 1.32-.27 2-.27.68 0 1.36.09 2 .27 1.53-1.04 2.2-.82 2.2-.82.44 1.1.16 1.92.08 2.12.51.56.82 1.27.82 2.15 0 3.07-1.87 3.75-3.65 3.95.29.25.54.73.54 1.48 0 1.07-.01 1.93-.01 2.2 0 .21.15.46.55.38A8.013 8.013 0 0016 8c0-4.42-3.58-8-8-8z"
></path>
</svg>
......@@ -3,6 +3,12 @@ title: Get Started
description: This article describes how to install TDengine and test its performance.
---
import github from './github.svg'
import discord from './discord.svg'
import twitter from './twitter.svg'
import youtube from './youtube.svg'
import linkedin from './linkedin.svg'
You can install and run TDengine on Linux/Windows/macOS machines as well as Docker containers. You can also deploy TDengine as a managed service with TDengine Cloud.
The full package of TDengine includes the TDengine Server (`taosd`), TDengine Client (`taosc`), taosAdapter for connecting with third-party systems and providing a RESTful interface, a command-line interface, and some tools. In addition to connectors for multiple languages, TDengine also provides a [RESTful interface](/reference/rest-api) through [taosAdapter](/reference/taosadapter).
......@@ -12,4 +18,16 @@ import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```
\ No newline at end of file
```
### Join TDengine Community
<table width="100%">
<tr align="center">
<td width="20%" style="border:0"><a href="https://github.com/taosdata/TDengine" target="_blank"><img src={github} alt="Star GitHub" width="50" /><p>Star GitHub</p></a></td>
<td width="20%" style="border:0"><a href="https://discord.com/invite/VZdSuUg4pS" target="_blank"><img src={discord} alt="Join Discord" width="50" /><p>Join Discord</p></a></td>
<td width="20%" style="border:0"><a href="https://twitter.com/TDengineDB" target="_blank"><img src={twitter} alt="Follow Twitter" width="50" /><p>Follow Twitter</p></a></td>
<td width="20%" style="border:0"><a href="https://www.youtube.com/@tdengine" target="_blank"><img src={youtube} alt="Subscribe YouTube" width="50" /><p>Subscribe YouTube</p></a></td>
<td width="20%" style="border:0"><a href="https://www.linkedin.com/company/tdengine" target="_blank"><img src={linkedin} alt="Follow LinkedIn" width="50" /><p>Follow LinkedIn</p></a></td>
</tr>
</table>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 -2 24 24">
<path
fill="rgb(10, 102, 194)"
d="M20.5 2h-17A1.5 1.5 0 002 3.5v17A1.5 1.5 0 003.5 22h17a1.5 1.5 0 001.5-1.5v-17A1.5 1.5 0 0020.5 2zM8 19H5v-9h3zM6.5 8.25A1.75 1.75 0 118.3 6.5a1.78 1.78 0 01-1.8 1.75zM19 19h-3v-4.74c0-1.42-.6-1.93-1.38-1.93A1.74 1.74 0 0013 14.19a.66.66 0 000 .14V19h-3v-9h2.9v1.3a3.11 3.11 0 012.7-1.4c1.55 0 3.36.86 3.36 3.66z"
></path>
</svg>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 -2 24 24">
<g fill="rgb(29, 155, 240)">
<path
d="M23.643 4.937c-.835.37-1.732.62-2.675.733.962-.576 1.7-1.49 2.048-2.578-.9.534-1.897.922-2.958 1.13-.85-.904-2.06-1.47-3.4-1.47-2.572 0-4.658 2.086-4.658 4.66 0 .364.042.718.12 1.06-3.873-.195-7.304-2.05-9.602-4.868-.4.69-.63 1.49-.63 2.342 0 1.616.823 3.043 2.072 3.878-.764-.025-1.482-.234-2.11-.583v.06c0 2.257 1.605 4.14 3.737 4.568-.392.106-.803.162-1.227.162-.3 0-.593-.028-.877-.082.593 1.85 2.313 3.198 4.352 3.234-1.595 1.25-3.604 1.995-5.786 1.995-.376 0-.747-.022-1.112-.065 2.062 1.323 4.51 2.093 7.14 2.093 8.57 0 13.255-7.098 13.255-13.254 0-.2-.005-.402-.014-.602.91-.658 1.7-1.477 2.323-2.41z"
></path>
</g>
</svg>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="-2 -8 32 32">
<g>
<g>
<path
d="M27.9727 3.12324C27.6435 1.89323 26.6768 0.926623 25.4468 0.597366C23.2197 2.24288e-07 14.285 0 14.285 0C14.285 0 5.35042 2.24288e-07 3.12323 0.597366C1.89323 0.926623 0.926623 1.89323 0.597366 3.12324C2.24288e-07 5.35042 0 10 0 10C0 10 2.24288e-07 14.6496 0.597366 16.8768C0.926623 18.1068 1.89323 19.0734 3.12323 19.4026C5.35042 20 14.285 20 14.285 20C14.285 20 23.2197 20 25.4468 19.4026C26.6768 19.0734 27.6435 18.1068 27.9727 16.8768C28.5701 14.6496 28.5701 10 28.5701 10C28.5701 10 28.5677 5.35042 27.9727 3.12324Z"
fill="#FF0000"
></path>
<path d="M11.4253 14.2854L18.8477 10.0004L11.4253 5.71533V14.2854Z" fill="white"></path>
</g>
</g>
</svg>
......@@ -3,7 +3,9 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询'
---
import xiaot from './tdengine.webp'
import xiaot from './xiaot.webp'
import channel from './channel.webp'
import official_account from './official-account.webp'
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)
......@@ -16,8 +18,19 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```
### 开发者技术交流群
微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
<img src={xiaot} alt="小 T 的二维码" width="200" />
### 加入 TDengine 官方社区
微信扫描以下二维码,学习了解 TDengine 的最新技术,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
<table width="100%">
<tr align="center">
<td style="padding:1em;border:0"><img src={xiaot} alt="小 T 的二维码" width="200" /></td>
<td style="padding:1em;border:0"><img src={channel} alt="TDengine 微信视频号" width="200" /></td>
<td style="padding:1em;border:0"><img src={official_account} alt="TDengine 微信公众号" width="200" /></td>
</tr>
<tr align="center">
<td style="padding:1em;border:0">加入“物联网大数据技术前沿群”<br/>与大家进行技术交流</td>
<td style="padding:1em;border:0">关注 TDengine 微信视频号<br/>收看技术直播与教学视频</td>
<td style="padding:1em;border:0">关注 TDengine 微信公众号<br/>阅读核心技术与行业案例文章</td>
</tr>
</table>
......@@ -46,6 +46,7 @@ extern "C" {
#define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions"
#define TSDB_INS_TABLE_TOPICS "ins_topics"
#define TSDB_INS_TABLE_STREAMS "ins_streams"
#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "perf_smas"
......
......@@ -119,6 +119,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_QUERIES,
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_APPS,
TSDB_MGMT_TABLE_STREAM_TASKS,
TSDB_MGMT_TABLE_MAX,
} EShowType;
......
......@@ -346,8 +346,8 @@ bool isValidDataType(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type);
void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type);
void *getDataMin(int32_t type);
void *getDataMax(int32_t type);
void *getDataMin(int32_t type, void* value);
void *getDataMax(int32_t type, void* value);
#ifdef __cplusplus
}
......
......@@ -104,7 +104,7 @@ typedef int32_t (*TUdfDestroyFunc)();
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
typedef uint16_t VarDataLenT; // maxVarDataLen: 65535
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
......
......@@ -78,7 +78,7 @@ static FORCE_INLINE double taos_align_get_double(const char *pBuf) {
{ (*(double *)(x)) = (*(double *)(y)); }
// #endif
typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
typedef uint16_t VarDataLenT; // maxVarDataLen: 65535
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0]
......
......@@ -39,7 +39,7 @@ if not exist %work_dir%\debug\ver-%2-x86 (
md %work_dir%\debug\ver-%2-x86
)
cd %work_dir%\debug\ver-%2-x64
rem #call vcvarsall.bat x64
call vcvarsall.bat x64
cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64
cmake --build .
rd /s /Q C:\TDengine
......
......@@ -61,6 +61,16 @@ Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExc
Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
[run]
Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe --win_service""" ; Flags: runhidden
[UninstallRun]
RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden
RunOnceId: "stoptaosadapter"; Filename: {sys}\sc.exe; Parameters: "stop taosadapter" ; Flags: runhidden
RunOnceId: "deltaosd"; Filename: {sys}\sc.exe; Parameters: "delete taosd" ; Flags: runhidden
RunOnceId: "deltaosadapter"; Filename: {sys}\sc.exe; Parameters: "delete taosadapter" ; Flags: runhidden
[Registry]
Root: HKLM; Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"; \
ValueType: expandsz; ValueName: "Path"; ValueData: "{olddata};C:\TDengine"; \
......
......@@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = {
};
static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......@@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......@@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false},
{TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false},
{TSDB_INS_TABLE_STREAMS, streamSchema, tListLen(streamSchema), false},
{TSDB_INS_TABLE_STREAM_TASKS, streamTaskSchema, tListLen(streamTaskSchema), false},
{TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true},
};
......
......@@ -277,7 +277,9 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
pColumnInfoData->varmeta.allocLen = len + oldLen;
}
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
if (pColumnInfoData->pData && pSource->pData) { // TD-20382
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
}
pColumnInfoData->varmeta.length = len + oldLen;
} else {
if (finalNumOfRows > (*capacity)) {
......
......@@ -2591,7 +2591,10 @@ int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pR
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp usedbRsp = {0};
if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) return -1;
if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) {
tDecoderClear(&decoder);
return -1;
}
taosArrayPush(pRsp->pArray, &usedbRsp);
}
tEndDecode(&decoder);
......
......@@ -61,26 +61,36 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
static float floatMin = -FLT_MAX, floatMax = FLT_MAX;
static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
FORCE_INLINE void *getDataMin(int32_t type) {
FORCE_INLINE void *getDataMin(int32_t type, void* value) {
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
return &floatMin;
*(float *)value = floatMin;
break;
case TSDB_DATA_TYPE_DOUBLE:
return &doubleMin;
*(double *)value = doubleMin;
break;
default:
return &tDataTypes[type].minValue;
*(int64_t *)value = tDataTypes[type].minValue;
break;
}
return value;
}
FORCE_INLINE void *getDataMax(int32_t type) {
FORCE_INLINE void *getDataMax(int32_t type, void* value) {
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
return &floatMax;
*(float *)value = floatMax;
break;
case TSDB_DATA_TYPE_DOUBLE:
return &doubleMax;
*(double *)value = doubleMax;
break;
default:
return &tDataTypes[type].maxValue;
*(int64_t *)value = tDataTypes[type].maxValue;
break;
}
return value;
}
bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; }
......
......@@ -178,8 +178,10 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
if (p->msg != NULL) {
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
}
rpcFreeCont(p->msg);
}
......
......@@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) {
type = TSDB_MGMT_TABLE_APPS;
} else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAM_TASKS;
} else {
// ASSERT(0);
}
......
......@@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
......@@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SName n;
int32_t cols = 0;
char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
......@@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
count += taosArrayGetSize(pLevel);
}
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTask->taskId, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
}
colDataAppend(pColInfo, numOfRows, nodeType, false);
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int32_t nodeId = TMAX(pTask->nodeId, 0);
colDataAppend(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0};
strcpy(status, "normal");
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&status, false);
numOfRows++;
}
}
// unlock
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
......@@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
......@@ -333,6 +333,8 @@ struct SVersionRange {
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
int64_t version;
STSRow *pTSRow;
SMemSkipListNode *forwards[0];
};
typedef struct SMemSkipList {
......@@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = tGetI64(p, &pRow->version);
pRow->pTSRow = (STSRow *)(p + n);
n += pRow->pTSRow->len;
return n;
}
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL;
......@@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
}
}
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
pIter->pRow = &pIter->row;
pIter->pRow->version = pIter->pNode->version;
pIter->pRow->pTSRow = pIter->pNode->pTSRow;
return pIter->pRow;
}
......
......@@ -18,10 +18,10 @@
#define MEM_MIN_HASH 1024
#define SL_MAX_LEVEL 5
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
......@@ -263,30 +263,27 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
}
bool tsdbTbDataIterNext(STbDataIter *pIter) {
SMemSkipListNode *pHead = pIter->pTbData->sl.pHead;
SMemSkipListNode *pTail = pIter->pTbData->sl.pTail;
pIter->pRow = NULL;
if (pIter->backward) {
ASSERT(pIter->pNode != pTail);
ASSERT(pIter->pNode != pIter->pTbData->sl.pTail);
if (pIter->pNode == pHead) {
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return false;
}
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
if (pIter->pNode == pHead) {
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return false;
}
} else {
ASSERT(pIter->pNode != pHead);
ASSERT(pIter->pNode != pIter->pTbData->sl.pHead);
if (pIter->pNode == pTail) {
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return false;
}
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
if (pIter->pNode == pTail) {
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return false;
}
}
......@@ -394,7 +391,7 @@ _err:
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
SMemSkipListNode *px;
SMemSkipListNode *pn;
TSDBKEY *pTKey;
TSDBKEY tKey = {0};
int32_t backward = flags & SL_MOVE_BACKWARD;
int32_t fromPos = flags & SL_MOVE_FROM_POS;
......@@ -413,9 +410,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel);
while (pn != pTbData->sl.pHead) {
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
int32_t c = tsdbKeyCmprFn(pTKey, pKey);
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c <= 0) {
break;
} else {
......@@ -442,7 +440,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel);
while (pn != pTbData->sl.pTail) {
int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey);
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c >= 0) {
break;
} else {
......@@ -467,8 +468,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
int8_t forward) {
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version,
STSRow *pRow, int8_t forward) {
int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
......@@ -477,13 +478,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
// node
level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pNode->level = level;
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
pNode->version = version;
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
if (NULL == pNode->pTSRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
memcpy(pNode->pTSRow, pRow, pRow->len);
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pn = pos[iLevel];
......@@ -549,7 +556,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
key.ts = row.pTSRow->ts;
nRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0);
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0);
if (code) {
goto _err;
}
......@@ -570,7 +577,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1);
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1);
if (code) {
goto _err;
}
......
......@@ -3219,7 +3219,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
SVersionRange* pVerRange, int32_t step) {
while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
rowIndex += step;
continue;
......
......@@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
}
}
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0;
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
// int32_t n = 0;
n += tPutI64(p, pRow->version);
if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
n += pRow->pTSRow->len;
// n += tPutI64(p, pRow->version);
// if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
// n += pRow->pTSRow->len;
return n;
}
// return n;
// }
int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
......@@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
......@@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
......@@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch
ASSERT(pTColumn->type == pColData->type);
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
SKvRowIdx *pKvIdx = NULL;
while (kvIter < nKvCols) {
......
......@@ -388,8 +388,10 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
if (p->msg) {
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
}
taosMemoryFreeClear(p->msg);
}
......
......@@ -528,7 +528,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
appendTableOptions(buf2, &len, pDbCfg, pCfg);
}
varDataLen(buf2) = len;
varDataLen(buf2) = (len > 65535) ? 65535 : len;
colDataAppend(pCol2, 0, buf2, false);
......
......@@ -513,6 +513,22 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
// taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
// }
// free pFillCol
if (pFillInfo->pFillCol) {
for (int32_t i = 0; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (!pCol->notFillCol) {
if (pCol->fillVal.nType == TSDB_DATA_TYPE_VARBINARY || pCol->fillVal.nType == TSDB_DATA_TYPE_VARCHAR ||
pCol->fillVal.nType == TSDB_DATA_TYPE_NCHAR || pCol->fillVal.nType == TSDB_DATA_TYPE_JSON) {
if (pCol->fillVal.pz) {
taosMemoryFree(pCol->fillVal.pz);
pCol->fillVal.pz = NULL;
}
}
}
}
}
taosMemoryFreeClear(pFillInfo->pTags);
taosMemoryFreeClear(pFillInfo->pFillCol);
taosMemoryFreeClear(pFillInfo);
......
......@@ -189,7 +189,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
}
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
if ((keyLen == pNode->keyLen) && (*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
break;
}
pNode = pNode->next;
......@@ -213,10 +213,12 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) {
SHNode *pNode = pHashObj->hashList[index];
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
const char* p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
ASSERT(keyLen > 0);
if (pNode->keyLen == keyLen && ((*(pHashObj->equalFp))(p, key, keyLen) == 0)) {
break;
}
pNode = pNode->next;
}
......
......@@ -6062,11 +6062,11 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema**
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_TABLE_NAME_LEN;
(*pSchema)[0].bytes = SHOW_CREATE_TB_RESULT_FIELD1_LEN;
strcpy((*pSchema)[0].name, "Table");
(*pSchema)[1].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[1].bytes = TSDB_MAX_BINARY_LEN;
(*pSchema)[1].bytes = SHOW_CREATE_TB_RESULT_FIELD2_LEN;
strcpy((*pSchema)[1].name, "Create Table");
return TSDB_CODE_SUCCESS;
......
......@@ -512,15 +512,17 @@ int32_t filterReuseRangeCtx(SFilterRangeCtx *ctx, int32_t type, int32_t options)
}
int32_t filterConvertRange(SFilterRangeCtx *cur, SFilterRange *ra, bool *notNull) {
int64_t tmp = 0;
if (!FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) {
int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type));
int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type, &tmp));
if (sr == 0) {
FILTER_SET_FLAG(ra->sflag, RANGE_FLG_NULL);
}
}
if (!FILTER_GET_FLAG(ra->eflag, RANGE_FLG_NULL)) {
int32_t er = cur->pCompareFunc(&ra->e, getDataMax(cur->type));
int32_t er = cur->pCompareFunc(&ra->e, getDataMax(cur->type, &tmp));
if (er == 0) {
FILTER_SET_FLAG(ra->eflag, RANGE_FLG_NULL);
}
......@@ -696,14 +698,15 @@ int32_t filterAddRangeImpl(void *h, SFilterRange *ra, int32_t optr) {
int32_t filterAddRange(void *h, SFilterRange *ra, int32_t optr) {
SFilterRangeCtx *ctx = (SFilterRangeCtx *)h;
int64_t tmp = 0;
if (FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) {
SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type));
SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type, &tmp));
// FILTER_CLR_FLAG(ra->sflag, RA_NULL);
}
if (FILTER_GET_FLAG(ra->eflag, RANGE_FLG_NULL)) {
SIMPLE_COPY_VALUES(&ra->e, getDataMax(ctx->type));
SIMPLE_COPY_VALUES(&ra->e, getDataMax(ctx->type, &tmp));
// FILTER_CLR_FLAG(ra->eflag, RA_NULL);
}
......
......@@ -286,9 +286,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
if (pJob->execRes.res) {
SSubmitRsp *sum = pJob->execRes.res;
sum->affectedRows += rsp->affectedRows;
sum->nBlocks += rsp->nBlocks;
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
sum->nBlocks += rsp->nBlocks;
if (rsp->nBlocks > 0 && rsp->pBlocks) {
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
}
taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp);
} else {
......
......@@ -114,12 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
return NULL;
}
char statePath[300];
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s/%d", path, pTask->taskId);
} else {
memset(statePath, 0, 300);
tstrncpy(statePath, path, 300);
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);
}
if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) {
goto _err;
......
......@@ -37,43 +37,6 @@
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
// only start once
static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) {
if (beginIndex > endIndex) {
sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex);
return;
}
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "snapshot sender already start");
return;
}
SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL;
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
ASSERT(code == 0);
#if 0
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
} else {
if (pReader != NULL) {
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
}
}
#endif
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
......
......@@ -385,7 +385,7 @@ bool syncIsReadyForRead(int64_t rid) {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
......@@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
pNode->pingTimerMS, pNode);
if (code != 0) {
sNError(pNode, "failed to build ping msg");
sError("failed to build ping msg");
rpcFreeCont(rpcMsg.pCont);
return;
}
......@@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
sNTrace(pNode, "enqueue ping msg");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
sError("failed to sync enqueue ping msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
return;
}
......@@ -1832,11 +1832,14 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
SElectTimer* pElectTimer = param;
SSyncNode* pNode = pElectTimer->pSyncNode;
if (pNode == NULL) return;
if (pNode->syncEqMsg == NULL) return;
SRpcMsg rpcMsg = {0};
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
if (code != 0) {
sNError(pNode, "failed to build elect msg");
sError("failed to build elect msg");
taosMemoryFree(pElectTimer);
return;
}
......@@ -1846,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
sError("failed to sync enqueue elect msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
taosMemoryFree(pElectTimer);
return;
......@@ -1876,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
pNode->heartbeatTimerMS, pNode);
if (code != 0) {
sNError(pNode, "failed to build heartbeat msg");
sError("failed to build heartbeat msg");
return;
}
sNTrace(pNode, "enqueue heartbeat timer");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
sError("failed to enqueue heartbeat msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
return;
}
......@@ -1968,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
sNTrace(pNode, "propose msg, type:noop");
code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
sError("failed to propose noop msg while enqueue since %s", terrstr());
}
return code;
......@@ -2002,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
sNError(ths, "append noop error");
sError("append noop error");
return -1;
}
}
......@@ -2106,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeFollowerCommit(ths, pMsg->fcIndex);
} else {
sNError(ths, "error local cmd");
sError("error local cmd");
}
return 0;
......
......@@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
int64_t tsWriteBegin = taosGetTimestampNs();
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
int64_t tsWriteEnd = taosGetTimestampNs();
int64_t tsElapsed = tsWriteEnd - tsWriteBegin;
if (index < 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
......@@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
}
pEntry->index = index;
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0;
}
......@@ -234,9 +239,13 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
return -1;
}
int64_t ts1 = taosGetTimestampNs();
taosThreadMutexLock(&(pData->mutex));
int64_t ts2 = taosGetTimestampNs();
code = walReadVer(pWalHandle, index);
int64_t ts3 = taosGetTimestampNs();
// code = walReadVerCached(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
......@@ -280,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
*/
taosThreadMutexUnlock(&(pData->mutex));
int64_t ts4 = taosGetTimestampNs();
int64_t tsElapsed = ts4 - ts1;
int64_t tsElapsedLock = ts2 - ts1;
int64_t tsElapsedRead = ts3 - ts2;
int64_t tsElapsedBuild = ts4 - ts3;
sNTrace(pData->pSyncNode,
"read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64
", elapsed-build:%" PRId64,
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
return code;
}
......
......@@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
SRpcMsg rpcMsg = {0};
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry;
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
if (code == 0) {
......@@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
}
}
syncEntryDestory(pEntry);
// prepare msg
ASSERT(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId;
......@@ -140,9 +142,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncAppendEntries* pMsg = pRpcMsg->pCont;
syncLogSendAppendEntries(pSyncNode, pMsg, "");
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
if (pMsg == NULL) {
sError("vgId:%d, sync-append-entries msg is NULL", pSyncNode->vgId);
return 0;
}
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
if (pState == NULL) {
......@@ -150,8 +153,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
return 0;
}
// save index, otherwise pMsg will be free by rpc
SyncIndex saveLastSendIndex = pState->lastSendIndex;
bool update = false;
if (pMsg->dataLen > 0) {
pState->lastSendIndex = pMsg->prevLogIndex + 1;
saveLastSendIndex = pMsg->prevLogIndex + 1;
update = true;
}
syncLogSendAppendEntries(pSyncNode, pMsg, "");
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
if (update) {
pState->lastSendIndex = saveLastSendIndex;
pState->lastSendTime = taosGetTimestampMs();
}
......
......@@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm;
// save error code, otherwise it will be overwritten
int32_t errCode = terrno;
......@@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
......@@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64
", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
......@@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex,
pMsg->dataLen, s);
}
......
......@@ -169,7 +169,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage) {
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
SPage *pPage;
i32 nRef;
i32 nRef = 0;
tdbPCacheLock(pCache);
......@@ -178,14 +178,17 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
nRef = tdbRefPage(pPage);
}
ASSERT(pPage);
tdbPCacheUnlock(pCache);
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
if (pPage) {
tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
} else {
tdbDebug("pcache/fetch page %p", pPage);
}
return pPage;
}
......@@ -266,7 +269,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
}
// 4. Try a create new page
if (!pPage) {
if (!pPage && pTxn->xMalloc != NULL) {
ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0 || pPage == NULL) {
// TODO
......
......@@ -27,6 +27,116 @@ typedef struct {
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
struct hashset_st {
size_t nbits;
size_t mask;
size_t capacity;
size_t *items;
size_t nitems;
double load_factor;
};
static const unsigned int prime = 39;
static const unsigned int prime2 = 5009;
hashset_t hashset_create(void) {
hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
if (!set) {
return NULL;
}
set->nbits = 4;
set->capacity = (size_t)(1 << set->nbits);
set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
if (!set->items) {
tdbOsFree(set);
return NULL;
}
set->mask = set->capacity - 1;
set->nitems = 0;
set->load_factor = 0.75;
return set;
}
void hashset_destroy(hashset_t set) {
if (set) {
tdbOsFree(set->items);
tdbOsFree(set);
}
}
int hashset_add_member(hashset_t set, void *item) {
size_t value = (size_t) item;
size_t h;
if (value == 0) {
return -1;
}
for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
if (set->items[h] == value) {
return 0;
}
}
set->items[h] = value;
++set->nitems;
return 1;
}
int hashset_add(hashset_t set, void *item) {
int ret = hashset_add_member(set, item);
size_t old_capacity = set->capacity;
if (set->nitems >= (double)old_capacity * set->load_factor) {
size_t *old_items = set->items;
++set->nbits;
set->capacity = (size_t)(1 << set->nbits);
set->mask = set->capacity - 1;
set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
if (!set->items) {
return -1;
}
set->nitems = 0;
for (size_t i = 0; i < old_capacity; ++i) {
hashset_add_member(set, (void*)old_items[i]);
}
tdbOsFree(old_items);
}
return ret;
}
int hashset_remove(hashset_t set, void *item) {
size_t value = (size_t) item;
for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
if (set->items[h] == value) {
set->items[h] = 0;
--set->nitems;
return 1;
}
}
return 0;
}
int hashset_contains(hashset_t set, void *item) {
size_t value = (size_t) item;
for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
if (set->items[h] == value) {
return 1;
}
}
return 0;
}
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
......@@ -209,12 +319,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
// Write page to journal if neccessary
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))))) {
ret = tdbPagerWritePageToJournal(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page to journal since %s", tstrerror(terrno));
return -1;
}
if (pPager->jPageSet) {
hashset_add(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
}
}
return 0;
......@@ -233,6 +347,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
return -1;
}
pPager->jPageSet = hashset_create();
// TODO: write the size of the file
pPager->inTran = 1;
......@@ -275,6 +390,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
if (pPager->jPageSet) {
hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
}
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
......@@ -304,6 +422,9 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
return -1;
}
if (pPager->jPageSet) {
hashset_destroy(pPager->jPageSet);
}
pPager->inTran = 0;
return 0;
......@@ -375,36 +496,61 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
return -1;
}
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
if (jfd == NULL) {
return -1;
}
tdb_fd_t jfd = pPager->jfd;
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
if (ret < 0) {
return -1;
}
// 1, read pages from jounal file
// 2, write original pages to buffered ones
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
if (pageBuf == NULL) {
return -1;
}
/* TODO: reset the buffered pages instead of releasing them
// loop to reset the dirty pages from file
for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) {
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
// read pgno & the page from journal
SPgno pgno;
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
if (ret < 0) {
tdbOsFree(pageBuf);
return -1;
}
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
if (ret < 0) {
tdbOsFree(pageBuf);
return -1;
}
i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) {
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
pPager->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
}
*/
if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
tdbOsFree(pageBuf);
// 3, release the dirty pages
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL;
......@@ -414,6 +560,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
......@@ -422,11 +569,48 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
// 4, remove the journal file
tdbOsClose(pPager->jfd);
(void)tdbOsRemove(pPager->jFileName);
hashset_destroy(pPager->jPageSet);
pPager->inTran = 0;
return 0;
}
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
SPage *pPage;
int ret;
// loop to write the dirty pages to file
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL;
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page to db since %s", tstrerror(terrno));
return -1;
}
}
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
iter = tRBTreeIterCreate(&pPager->rbt, 1);
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
tRBTreeCreate(&pPager->rbt, pageCmpFn);
return 0;
}
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn) {
SPage *pPage;
......@@ -453,10 +637,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
// fetch a page container
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
if (pPage == NULL) {
ASSERT(0);
return -1;
while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
tdbPagerFlushPage(pPager, pTxn);
}
tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
......
......@@ -384,6 +384,8 @@ struct STDB {
#endif
};
typedef struct hashset_st *hashset_t;
struct SPager {
char *dbFileName;
char *jFileName;
......@@ -394,7 +396,8 @@ struct SPager {
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
SPage *pDirty;
//SPage *pDirty;
hashset_t jPageSet;
SRBTree rbt;
u8 inTran;
SPager *pNext; // used by TDB
......
......@@ -124,8 +124,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto _err;
}
char* candidate = NULL;
char* haystack = buf;
char* candidate = NULL;
char* haystack = buf;
int64_t pos = 0;
SWalCkHead* logContent = NULL;
......@@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
ASSERT(pFileInfo->fileSize == 0);
// remove the empty wal log, and its idx
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr);
// remove its meta entry
taosArrayRemove(pWal->fileInfoSet, fileIdx);
......
......@@ -319,7 +319,7 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (pInfo) {
if (ver >= pInfo->lastVer) {
pInfo--;
pInfo++;
}
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
......@@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) {
}
walBuildLogName(pWal, newFileFirstVer, fnameStr);
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1;
......
......@@ -271,8 +271,11 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType
cfgStypeStr(stype), value, terrstr());
return -1;
}
pItem->stype = stype;
// apply new timezone
osSetTimezone(value);
return 0;
}
......
......@@ -9,7 +9,7 @@
,,y,script,./test.sh -f tsim/user/basic.sim
,,y,script,./test.sh -f tsim/user/password.sim
,,y,script,./test.sh -f tsim/user/privilege_db.sim
,,,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,,script,./test.sh -f tsim/db/alter_option.sim
,,,script,./test.sh -f tsim/db/alter_replica_13.sim
,,,script,./test.sh -f tsim/db/alter_replica_31.sim
......@@ -23,16 +23,16 @@
,,,script,./test.sh -f tsim/db/create_all_options.sim
,,y,script,./test.sh -f tsim/db/delete_reuse1.sim
,,y,script,./test.sh -f tsim/db/delete_reuse2.sim
,,,script,./test.sh -f tsim/db/delete_reusevnode.sim
,,y,script,./test.sh -f tsim/db/delete_reusevnode.sim
,,y,script,./test.sh -f tsim/db/delete_reusevnode2.sim
,,,script,./test.sh -f tsim/db/delete_writing1.sim
,,,script,./test.sh -f tsim/db/delete_writing2.sim
,,y,script,./test.sh -f tsim/db/delete_writing1.sim
,,y,script,./test.sh -f tsim/db/delete_writing2.sim
,,y,script,./test.sh -f tsim/db/error1.sim
,,y,script,./test.sh -f tsim/db/keep.sim
,,y,script,./test.sh -f tsim/db/len.sim
,,y,script,./test.sh -f tsim/db/repeat.sim
,,y,script,./test.sh -f tsim/db/show_create_db.sim
,,,script,./test.sh -f tsim/db/show_create_table.sim
,,y,script,./test.sh -f tsim/db/show_create_table.sim
,,y,script,./test.sh -f tsim/db/tables.sim
,,y,script,./test.sh -f tsim/db/taosdlog.sim
,,,script,./test.sh -f tsim/dnode/balance_replica1.sim
......@@ -68,7 +68,7 @@
,,y,script,./test.sh -f tsim/insert/basic0.sim
,,y,script,./test.sh -f tsim/insert/basic1.sim
,,y,script,./test.sh -f tsim/insert/basic2.sim
,,,script,./test.sh -f tsim/insert/commit-merge0.sim
,,y,script,./test.sh -f tsim/insert/commit-merge0.sim
,,y,script,./test.sh -f tsim/insert/insert_drop.sim
,,y,script,./test.sh -f tsim/insert/insert_select.sim
,,y,script,./test.sh -f tsim/insert/null.sim
......@@ -80,8 +80,8 @@
,,y,script,./test.sh -f tsim/insert/query_multi_file.sim
,,y,script,./test.sh -f tsim/insert/tcp.sim
,,y,script,./test.sh -f tsim/insert/update0.sim
,,,script,./test.sh -f tsim/insert/update1_sort_merge.sim
,,,script,./test.sh -f tsim/parser/alter__for_community_version.sim
,,y,script,./test.sh -f tsim/insert/update1_sort_merge.sim
,,y,script,./test.sh -f tsim/parser/alter__for_community_version.sim
,,y,script,./test.sh -f tsim/parser/alter_column.sim
,,y,script,./test.sh -f tsim/parser/alter_stable.sim
,,y,script,./test.sh -f tsim/parser/alter.sim
......@@ -92,24 +92,24 @@
,,y,script,./test.sh -f tsim/parser/binary_escapeCharacter.sim
,,,script,./test.sh -f tsim/parser/col_arithmetic_operation.sim
,,y,script,./test.sh -f tsim/parser/columnValue_bigint.sim
,,,script,./test.sh -f tsim/parser/columnValue_bool.sim
,,y,script,./test.sh -f tsim/parser/columnValue_bool.sim
,,y,script,./test.sh -f tsim/parser/columnValue_double.sim
,,y,script,./test.sh -f tsim/parser/columnValue_float.sim
,,y,script,./test.sh -f tsim/parser/columnValue_int.sim
,,y,script,./test.sh -f tsim/parser/columnValue_smallint.sim
,,y,script,./test.sh -f tsim/parser/columnValue_tinyint.sim
,,,script,./test.sh -f tsim/parser/columnValue_unsign.sim
,,,script,./test.sh -f tsim/parser/commit.sim
,,,script,./test.sh -f tsim/parser/condition.sim
,,y,script,./test.sh -f tsim/parser/commit.sim
,,y,script,./test.sh -f tsim/parser/condition.sim
,,y,script,./test.sh -f tsim/parser/constCol.sim
,,,script,./test.sh -f tsim/parser/create_db.sim
,,,script,./test.sh -f tsim/parser/create_mt.sim
,,y,script,./test.sh -f tsim/parser/create_db.sim
,,y,script,./test.sh -f tsim/parser/create_mt.sim
,,y,script,./test.sh -f tsim/parser/create_tb_with_tag_name.sim
,,y,script,./test.sh -f tsim/parser/create_tb.sim
,,,script,./test.sh -f tsim/parser/dbtbnameValidate.sim
,,y,script,./test.sh -f tsim/parser/dbtbnameValidate.sim
,,y,script,./test.sh -f tsim/parser/distinct.sim
,,y,script,./test.sh -f tsim/parser/fill_us.sim
,,,script,./test.sh -f tsim/parser/fill.sim
,,y,script,./test.sh -f tsim/parser/fill.sim
,,y,script,./test.sh -f tsim/parser/first_last.sim
,,y,script,./test.sh -f tsim/parser/fourArithmetic-basic.sim
,,,script,./test.sh -f tsim/parser/function.sim
......@@ -120,9 +120,9 @@
,,y,script,./test.sh -f tsim/parser/import_commit1.sim
,,y,script,./test.sh -f tsim/parser/import_commit2.sim
,,y,script,./test.sh -f tsim/parser/import_commit3.sim
,,,script,./test.sh -f tsim/parser/import_file.sim
,,y,script,./test.sh -f tsim/parser/import_file.sim
,,y,script,./test.sh -f tsim/parser/import.sim
,,,script,./test.sh -f tsim/parser/insert_multiTbl.sim
,,y,script,./test.sh -f tsim/parser/insert_multiTbl.sim
,,y,script,./test.sh -f tsim/parser/insert_tb.sim
,,,script,./test.sh -f tsim/parser/join_manyblocks.sim
,,y,script,./test.sh -f tsim/parser/join_multitables.sim
......@@ -131,14 +131,14 @@
,,y,script,./test.sh -f tsim/parser/last_cache.sim
,,y,script,./test.sh -f tsim/parser/last_groupby.sim
,,y,script,./test.sh -f tsim/parser/lastrow.sim
,,,script,./test.sh -f tsim/parser/lastrow2.sim
,,y,script,./test.sh -f tsim/parser/lastrow2.sim
,,y,script,./test.sh -f tsim/parser/like.sim
,,,script,./test.sh -f tsim/parser/limit.sim
,,,script,./test.sh -f tsim/parser/limit1.sim
,,y,script,./test.sh -f tsim/parser/mixed_blocks.sim
,,,script,./test.sh -f tsim/parser/nchar.sim
,,y,script,./test.sh -f tsim/parser/nchar.sim
,,,script,./test.sh -f tsim/parser/nestquery.sim
,,,script,./test.sh -f tsim/parser/null_char.sim
,,y,script,./test.sh -f tsim/parser/null_char.sim
,,y,script,./test.sh -f tsim/parser/precision_ns.sim
,,,script,./test.sh -f tsim/parser/projection_limit_offset.sim
,,y,script,./test.sh -f tsim/parser/regex.sim
......@@ -146,8 +146,8 @@
,,y,script,./test.sh -f tsim/parser/select_distinct_tag.sim
,,y,script,./test.sh -f tsim/parser/select_from_cache_disk.sim
,,y,script,./test.sh -f tsim/parser/select_with_tags.sim
,,,script,./test.sh -f tsim/parser/selectResNum.sim
,,,script,./test.sh -f tsim/parser/set_tag_vals.sim
,,y,script,./test.sh -f tsim/parser/selectResNum.sim
,,y,script,./test.sh -f tsim/parser/set_tag_vals.sim
,,y,script,./test.sh -f tsim/parser/single_row_in_tb.sim
,,,script,./test.sh -f tsim/parser/sliding.sim
,,y,script,./test.sh -f tsim/parser/slimit_alter_tags.sim
......@@ -178,24 +178,24 @@
,,,script,./test.sh -f tsim/mnode/basic3.sim
,,,script,./test.sh -f tsim/mnode/basic4.sim
,,,script,./test.sh -f tsim/mnode/basic5.sim
,,,script,./test.sh -f tsim/show/basic.sim
,,y,script,./test.sh -f tsim/show/basic.sim
,,y,script,./test.sh -f tsim/table/autocreate.sim
,,y,script,./test.sh -f tsim/table/basic1.sim
,,,script,./test.sh -f tsim/table/basic2.sim
,,y,script,./test.sh -f tsim/table/basic2.sim
,,y,script,./test.sh -f tsim/table/basic3.sim
,,y,script,./test.sh -f tsim/table/bigint.sim
,,y,script,./test.sh -f tsim/table/binary.sim
,,y,script,./test.sh -f tsim/table/bool.sim
,,,script,./test.sh -f tsim/table/column_name.sim
,,y,script,./test.sh -f tsim/table/column_name.sim
,,y,script,./test.sh -f tsim/table/column_num.sim
,,y,script,./test.sh -f tsim/table/column_value.sim
,,y,script,./test.sh -f tsim/table/column2.sim
,,y,script,./test.sh -f tsim/table/createmulti.sim
,,y,script,./test.sh -f tsim/table/date.sim
,,y,script,./test.sh -f tsim/table/db.table.sim
,,,script,./test.sh -f tsim/table/delete_reuse1.sim
,,,script,./test.sh -f tsim/table/delete_reuse2.sim
,,,script,./test.sh -f tsim/table/delete_writing.sim
,,y,script,./test.sh -f tsim/table/delete_reuse1.sim
,,y,script,./test.sh -f tsim/table/delete_reuse2.sim
,,y,script,./test.sh -f tsim/table/delete_writing.sim
,,y,script,./test.sh -f tsim/table/describe.sim
,,y,script,./test.sh -f tsim/table/double.sim
,,y,script,./test.sh -f tsim/table/float.sim
......@@ -203,11 +203,12 @@
,,y,script,./test.sh -f tsim/table/int.sim
,,y,script,./test.sh -f tsim/table/limit.sim
,,y,script,./test.sh -f tsim/table/smallint.sim
,,,script,./test.sh -f tsim/table/table_len.sim
,,y,script,./test.sh -f tsim/table/table_len.sim
,,y,script,./test.sh -f tsim/table/table.sim
,,y,script,./test.sh -f tsim/table/tinyint.sim
,,y,script,./test.sh -f tsim/table/vgroup.sim
,,,script,./test.sh -f tsim/stream/basic0.sim -g
,,,script,./test.sh -f tsim/stream/basic1.sim
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
......@@ -271,7 +272,7 @@
,,y,script,./test.sh -f tsim/stable/tag_filter.sim
,,y,script,./test.sh -f tsim/stable/tag_modify.sim
,,y,script,./test.sh -f tsim/stable/tag_rename.sim
,,,script,./test.sh -f tsim/stable/values.sim
,,y,script,./test.sh -f tsim/stable/values.sim
,,y,script,./test.sh -f tsim/stable/vnode3.sim
,,y,script,./test.sh -f tsim/stable/metrics_idx.sim
,,,script,./test.sh -f tsim/sma/drop_sma.sim
......@@ -325,7 +326,7 @@
,,y,script,./test.sh -f tsim/compute/bottom.sim
,,y,script,./test.sh -f tsim/compute/count.sim
,,y,script,./test.sh -f tsim/compute/diff.sim
,,,script,./test.sh -f tsim/compute/diff2.sim
,,y,script,./test.sh -f tsim/compute/diff2.sim
,,y,script,./test.sh -f tsim/compute/first.sim
,,y,script,./test.sh -f tsim/compute/interval.sim
,,y,script,./test.sh -f tsim/compute/last_row.sim
......@@ -358,7 +359,7 @@
,,y,script,./test.sh -f tsim/vector/metrics_query.sim
,,y,script,./test.sh -f tsim/vector/metrics_tag.sim
,,y,script,./test.sh -f tsim/vector/metrics_time.sim
,,,script,./test.sh -f tsim/vector/multi.sim
,,y,script,./test.sh -f tsim/vector/multi.sim
,,y,script,./test.sh -f tsim/vector/single.sim
,,y,script,./test.sh -f tsim/vector/table_field.sim
,,y,script,./test.sh -f tsim/vector/table_mix.sim
......@@ -380,7 +381,7 @@
,,y,script,./test.sh -f tsim/tag/column.sim
,,y,script,./test.sh -f tsim/tag/commit.sim
,,y,script,./test.sh -f tsim/tag/create.sim
,,,script,./test.sh -f tsim/tag/delete.sim
,,y,script,./test.sh -f tsim/tag/delete.sim
,,y,script,./test.sh -f tsim/tag/double.sim
,,y,script,./test.sh -f tsim/tag/filter.sim
,,y,script,./test.sh -f tsim/tag/float.sim
......@@ -392,7 +393,7 @@
,,y,script,./test.sh -f tsim/tag/tinyint.sim
,,y,script,./test.sh -f tsim/tag/drop_tag.sim
,,y,script,./test.sh -f tsim/tag/tbNameIn.sim
,,,script,./test.sh -f tmp/monitor.sim
,,y,script,./test.sh -f tmp/monitor.sim
#system test
......
......@@ -20,7 +20,7 @@ LOG_DIR=$TAOS_DIR/sim/tsim/asan
error_num=`cat ${LOG_DIR}/*.asan | grep "ERROR" | wc -l`
memory_leak=`cat ${LOG_DIR}/*.asan | grep "Direct leak" | wc -l`
indirect_leak=`cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l`
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | wc -l`
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
......
......@@ -87,6 +87,7 @@ ELSE ()
MESSAGE("CURRENT SOURCE DIR ${CMAKE_CURRENT_SOURCE_DIR}")
IF (TD_WINDOWS)
MESSAGE("Building taosAdapter on Windows")
INCLUDE(ExternalProject)
ExternalProject_Add(taosadapter
PREFIX "taosadapter"
......@@ -104,14 +105,18 @@ ELSE ()
COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND cmake -E time upx taosadapter ||:
COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin
COMMAND cmake -E echo "Comparessing taosadapter.exe"
COMMAND cmake -E time upx taosadapter.exe
COMMAND cmake -E echo "Copy taosadapter.exe"
COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin/taosadapter.exe
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter.toml"
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter-debug.exe"
COMMAND cmake -E copy taosadapter-debug.exe ${CMAKE_BINARY_DIR}/build/bin
)
ELSE (TD_WINDOWS)
MESSAGE("Building taosAdapter on non-Windows")
INCLUDE(ExternalProject)
ExternalProject_Add(taosadapter
PREFIX "taosadapter"
......@@ -126,11 +131,15 @@ ELSE ()
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND cmake -E echo "Comparessing taosadapter.exe"
COMMAND upx taosadapter || :
COMMAND cmake -E echo "Copy taosadapter"
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter.toml"
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter-debug"
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
)
ENDIF (TD_WINDOWS)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册