......@@ -87,6 +87,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修
- httpPort: RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接口发起查询/写入请求。
- dataDir: 数据文件目录,所有的数据文件都将写入该目录。默认值:/var/lib/taos。
- logDir:日志文件目录,客户端和服务器的运行日志文件将写入该目录。默认值:/var/log/taos。
- tempDir:临时文件目录,客户端和服务器的临时文件(主要是查询时用于保存中间结果的问题)将写入该目录。 默认值:Linux下为 /tmp/,Windows下为环境变量 tmp 或 temp 指向的目录。
- arbitrator:系统中裁决器的end point, 缺省值为空。
- role:dnode的可选角色。0-any; 既可作为mnode,也可分配vnode;1-mgmt;只能作为mnode,不能分配vnode;2-dnode;不能作为mnode,只能分配vnode
- debugFlag:运行日志开关。131(输出错误和警告日志),135( 输出错误、警告和调试日志),143( 输出错误、警告、调试和跟踪日志)。默认值:131或135(不同模块有不同的默认值)。
......@@ -844,7 +844,7 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
SELECT PERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause];
SELECT PERCENTILE(field_name, P) FROM { tb_name } [WHERE clause];
返回结果数据类型: 双精度浮点数Double。
......@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景
Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44851T。TDengine大概需要消耗44851/5=8970T, 8.9P空间。
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44.8512T。TDengine大概需要消耗44.851/5=8.97024T空间。
......@@ -616,6 +616,43 @@ HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间
- httpEnableCompress: 是否支持压缩,默认不支持,目前TDengine仅支持gzip压缩格式
- httpDebugFlag: 日志开关,131:仅错误和报警信息,135:调试信息,143:非常详细的调试信息,默认131
## CSharp Connector
#### 安装TDengine客户端
- TDengineDriver.cs 调用taos.dll文件的Native C方法
- TDengineTest.cs 参考程序示例
#### 使用方法
- 将C#接口文件TDengineDriver.cs加入到应用程序所在.NET项目中
- 参考TDengineTest.cs来定义数据库连接参数,及执行数据插入、查询等操作的方法
- 因为C#接口需要用到`taos.dll`文件,用户可以将`taos.dll`文件加入.NET解决方案中
#### 注意事项
- `taos.dll`文件使用x64平台编译,所以.NET项目在生成.exe文件时,“解决方案”/“项目”的“平台”请均选择“x64”。
- 此.NET接口目前已经在Visual Studio 2013/2015/2017中验证过,其它VS版本尚待验证。
#### 第三方驱动
## Go Connector
......@@ -38,9 +38,9 @@
6. 检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/lib/taos*里, 并且*/usr/local/lib/taos*在系统库函数搜索路径*LD_LIBRARY_PATH*
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH*
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*driver/c/taos.dll*在你的系统搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} `
......@@ -20,6 +20,9 @@
# data file's directory
# dataDir /var/lib/taos
# temporary file's directory
# tempDir /tmp/
# the arbitrator's fully qualified domain name (FQDN) for TDengine system, for cluster only
# arbitrator arbitrator_hostname:6042
......@@ -256,3 +259,5 @@
# maximum display width of binary and nchar fields in the shell. The parts exceeding this limit will be hidden
# maxBinaryDisplayWidth 30
# enable/disable telemetry reporting
# telemetryReporting 1
\ No newline at end of file
......@@ -121,8 +121,11 @@ function install_config() {
echo -e -n "${GREEN}Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join${NC}"
echo -e -n "${GREEN}OR leave it blank to build one${NC}:"
read firstEp
while true; do
#read firstEp
if exec < /dev/tty; then
read firstEp;
while true; do
if [ ! -z "$firstEp" ]; then
# check the format of the firstEp
#if [[ $firstEp == $FQDN_PATTERN ]]; then
......@@ -64,9 +64,8 @@ typedef struct SLocalReducer {
SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo
tFilePage * discardData;
SResultInfo * pResInfo;
char* pFinalRes; // result data after interpo
tFilePage* discardData;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
......@@ -23,7 +23,7 @@ extern "C" {
#include "tscUtil.h"
#include "tsclient.h"
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscFetchDatablockForSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
......@@ -228,6 +228,7 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......@@ -107,9 +107,6 @@ SSchema tscGetTbnameColumnSchema();
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size);
//todo tags value as well as the table id structure needs refactor
char *tsGetTagsValue(STableMeta *pMeta);
#ifdef __cplusplus
......@@ -339,9 +339,9 @@ typedef struct STscObj {
} STscObj;
typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
typedef struct SSqlObj {
......@@ -431,14 +431,6 @@ void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
void tscFreeSqlResult(SSqlObj *pSql);
* only free part of resources allocated during query.
* TODO remove it later
* Note: this function is multi-thread safe.
* @param pObj
void tscPartiallyFreeSqlObj(SSqlObj *pSql);
* free sql object, release allocated resource
* @param pObj
......@@ -523,7 +515,6 @@ extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus
......@@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
} else {
......@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
} else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
......@@ -49,82 +49,6 @@ typedef struct SCreateBuilder {
} SCreateBuilder;
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength);
static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) {
char buf[512] = {0};
int32_t len = 0;
int32_t MAX_BOOL_TYPE_LENGTH = 5; // max(strlen("true"), strlen("false"));
switch (type) {
return length;
return length;
double dv = 0;
dv = GET_DOUBLE_VAL(pData);
len = sprintf(buf, "%lf", dv);
if (strncasecmp("nan", buf, 3) == 0) {
len = 4;
} break;
float fv = 0;
fv = GET_FLOAT_VAL(pData);
len = sprintf(buf, "%f", fv);
if (strncasecmp("nan", buf, 3) == 0) {
len = 4;
} break;
len = sprintf(buf, "%" PRId64, *(int64_t *)pData);
len = sprintf(buf, "%d", *(int32_t *)pData);
return len;
* we need to convert all data into string, so we need to sprintf all kinds of
* non-string data into string, and record its length to get the right
* maximum length. The length may be less or greater than its original binary length:
* For example:
* length((short) 1) == 1, less than sizeof(short)
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
return 0;
char * pTagValue = tsGetTagsValue(pMeta);
SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
int32_t len = getToStringLength(pTagValue, pTagsSchema[0].bytes, pTagsSchema[0].type);
pTagValue += pTagsSchema[0].bytes;
int32_t numOfTags = tscGetNumOfTags(pMeta);
for (int32_t i = 1; i < numOfTags; ++i) {
int32_t tLen = getToStringLength(pTagValue, pTagsSchema[i].bytes, pTagsSchema[i].type);
if (len < tLen) {
len = tLen;
pTagValue += pTagsSchema[i].bytes;
return len;
static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
......@@ -186,8 +110,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
return 0;
// the following is handle display tags value for meters created according to metric
char *pTagValue = tsGetTagsValue(pMeta);
// the following is handle display tags for table created according to super table
for (int32_t i = numOfRows; i < totalNumOfRows; ++i) {
// field name
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
......@@ -219,8 +142,6 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
const char *src = "TAG";
STR_WITH_MAXSIZE_TO_VARSTR(target, src, pField->bytes);
pTagValue += pSchema[i].bytes;
return 0;
......@@ -286,10 +207,10 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
const int32_t TYPE_COLUMN_LENGTH = 16;
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
int32_t noteFieldLen = tscMaxLengthOfTagsFields(pSql);
if (noteFieldLen == 0) {
int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;//tscMaxLengthOfTagsFields(pSql);
// if (noteFieldLen == 0) {
// }
int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen);
......@@ -99,12 +99,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
SResultInfo *pResInfo = &pReducer->pResInfo[i];
pResInfo->bufLen = pExpr->interBytes;
pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen);
pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->resultInfo->superTableQ = true;
pCtx->interBufBytes = pExpr->interBytes;
pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo));
pCtx->stableQuery = true;
int16_t n = 0;
......@@ -345,7 +342,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0;
pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc);
......@@ -489,13 +485,15 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
if (pCtx->tagInfo.pTagCtxList != NULL) {
......@@ -509,15 +507,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
if (pLocalReducer->pResInfo != NULL) {
size_t num = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < num; ++i) {
if (pLocalReducer->pLoserTree) {
......@@ -1072,7 +1061,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
......@@ -65,7 +65,6 @@ static bool validateTagParams(tFieldList* pTagsList, tFieldList* pFieldList, SSq
static int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStrToken* tableName, int32_t* len);
static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength);
static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName);
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult);
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
......@@ -80,9 +79,9 @@ static void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo);
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd);
static int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExprItem* pItem);
......@@ -616,11 +615,13 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) {
return false;
int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg1 = "invalid query expression";
const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval";
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -660,7 +661,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
if (parseSlidingClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseSlidingClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
......@@ -713,7 +714,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
if (parseSlidingClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseSlidingClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
......@@ -771,13 +772,15 @@ int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQue
int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg0 = "sliding value too small";
const char* msg1 = "sliding value no larger than the interval value";
const char* msg2 = "sliding value can not less than 1% of interval value";
const char* msg3 = "does not support sliding when interval is natual month/year";
const char* msg3 = "does not support sliding when interval is natural month/year";
const char* msg4 = "sliding not support yet in ordinary query";
const static int32_t INTERVAL_SLIDING_FACTOR = 100;
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -810,6 +813,10 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
if (pQueryInfo->interval.sliding != pQueryInfo->interval.interval && pSql->pStream == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
......@@ -1215,7 +1222,7 @@ static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex*
tscColumnListInsert(pQueryInfo->colList, &tsCol);
static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSQLExprItem* pItem) {
const char* msg1 = "invalid column name, or illegal column type";
const char* msg1 = "invalid column name, illegal column type, or columns in arithmetic expression from two tables";
const char* msg2 = "invalid arithmetic expression in select clause";
const char* msg3 = "tag columns can not be used in arithmetic expression";
const char* msg4 = "columns from different table mixed up in arithmetic expression";
......@@ -1629,16 +1636,15 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc,
char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types";
int16_t type = 0;
int16_t bytes = 0;
char columnName[TSDB_COL_NAME_LEN] = {0};
int32_t functionID = cvtFunc.execFuncId;
if (functionID == TSDB_FUNC_SPREAD) {
int32_t t1 = pSchema[pColIndex->columnIndex].type;
int32_t t1 = pSchema->type;
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
return -1;
......@@ -1647,18 +1653,12 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
bytes = tDataTypeDesc[type].nSize;
} else {
type = pSchema[pColIndex->columnIndex].type;
bytes = pSchema[pColIndex->columnIndex].bytes;
type = pSchema->type;
bytes = pSchema->bytes;
if (aliasName != NULL) {
tstrncpy(columnName, aliasName, sizeof(columnName));
} else {
getRevisedName(columnName, cvtFunc.originFuncId, sizeof(columnName) - 1, pSchema[pColIndex->columnIndex].name);
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
tstrncpy(pExpr->aliasName, columnName, sizeof(pExpr->aliasName));
tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName));
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
pExpr->colInfo.flag |= TSDB_COL_NULL;
......@@ -1678,7 +1678,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
// if it is not in the final result, do not add it
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr);
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->aliasName, pExpr);
} else {
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]));
......@@ -1686,6 +1686,23 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
void setResultColName(char* name, tSQLExprItem* pItem, int32_t functionId, SStrToken* pToken) {
if (pItem->aliasName != NULL) {
tstrncpy(name, pItem->aliasName, TSDB_COL_NAME_LEN);
} else {
char uname[TSDB_COL_NAME_LEN] = {0};
int32_t len = MIN(pToken->n + 1, TSDB_COL_NAME_LEN);
tstrncpy(uname, pToken->z, len);
int32_t size = TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].aName) + 2 + 1;
char tmp[TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].aName) + 2 + 1] = {0};
snprintf(tmp, size, "%s(%s)", aAggs[functionId].aName, uname);
tstrncpy(name, tmp, TSDB_COL_NAME_LEN);
int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) {
STableMetaInfo* pTableMetaInfo = NULL;
int32_t optr = pItem->pNode->nSQLOptr;
......@@ -1943,8 +1960,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (pParamElem->pNode->nSQLOptr == TK_ALL) {
// select table.*
if (pParamElem->pNode->nSQLOptr == TK_ALL) { // select table.*
SStrToken tmpToken = pParamElem->pNode->colInfo;
if (getTableIndexByName(&tmpToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
......@@ -1954,9 +1970,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
char name[TSDB_COL_NAME_LEN] = {0};
for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) {
index.columnIndex = j;
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) {
SStrToken t = {.z = pSchema[j].name, .n = (uint32_t)strnlen(pSchema[j].name, TSDB_COL_NAME_LEN)};
setResultColName(name, pItem, cvtFunc.originFuncId, &t);
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) {
......@@ -1967,14 +1987,18 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
// functions can not be applied to tags
if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) {
char name[TSDB_COL_NAME_LEN] = {0};
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
setResultColName(name, pItem, cvtFunc.originFuncId, &pParamElem->pNode->colInfo);
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex + i, &index, finalResult) != 0) {
......@@ -2011,7 +2035,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) {
SColumnIndex index = {.tableIndex = j, .columnIndex = i};
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) {
char name[TSDB_COL_NAME_LEN] = {0};
SStrToken t = {.z = pSchema->name, .n = (uint32_t)strnlen(pSchema->name, TSDB_COL_NAME_LEN)};
setResultColName(name, pItem, cvtFunc.originFuncId, &t);
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, finalResult) != 0) {
......@@ -2244,10 +2273,6 @@ void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLengt
void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName) {
snprintf(resultFieldName, maxLen, "%s(%s)", aAggs[functionId].aName, columnName);
static bool isTablenameToken(SStrToken* token) {
SStrToken tmpToken = *token;
SStrToken tableToken = {0};
......@@ -2732,6 +2757,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) {
const char* msg1 = "too many columns in group by clause";
const char* msg2 = "invalid column name in group by clause";
const char* msg3 = "columns from one table allowed as group by columns";
const char* msg7 = "not support group by expression";
const char* msg8 = "not allowed column type for group by";
const char* msg9 = "tags not allowed for table query";
......@@ -2767,7 +2793,11 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
tableIndex = index.tableIndex;
if (tableIndex == COLUMN_INDEX_INITIAL_VAL) {
tableIndex = index.tableIndex;
} else if (tableIndex != index.tableIndex) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -3352,7 +3382,8 @@ int32_t doArithmeticExprToString(tSQLExpr* pExpr, char** exprString) {
static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList, int32_t* type) {
static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList,
int32_t* type, uint64_t* uid) {
if (pExpr->nSQLOptr == TK_ID) {
if (*type == NON_ARITHMEIC_EXPR) {
......@@ -3401,13 +3432,22 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQuer
// Not supported data type in arithmetic expression
uint64_t id = -1;
for(int32_t i = 0; i < inc; ++i) {
SSqlExpr* p1 = tscSqlExprGet(pQueryInfo, i + outputIndex);
int16_t t = p1->resType;
if (i == 0) {
id = p1->uid;
} else if (id != p1->uid){
*uid = id;
......@@ -3419,13 +3459,16 @@ static int32_t validateArithmeticSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryI
tSQLExpr* pLeft = pExpr->pLeft;
uint64_t uidLeft = 0;
uint64_t uidRight = 0;
if (pLeft->nSQLOptr >= TK_PLUS && pLeft->nSQLOptr <= TK_REM) {
int32_t ret = validateArithmeticSQLExpr(pCmd, pLeft, pQueryInfo, pList, type);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
} else {
int32_t ret = validateSQLExpr(pCmd, pLeft, pQueryInfo, pList, type);
int32_t ret = validateSQLExpr(pCmd, pLeft, pQueryInfo, pList, type, &uidLeft);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
......@@ -3438,10 +3481,15 @@ static int32_t validateArithmeticSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryI
return ret;
} else {
int32_t ret = validateSQLExpr(pCmd, pRight, pQueryInfo, pList, type);
int32_t ret = validateSQLExpr(pCmd, pRight, pQueryInfo, pList, type, &uidRight);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
// the expression not from the same table, return error
if (uidLeft != uidRight && uidLeft != 0 && uidRight != 0) {
......@@ -4427,8 +4475,8 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp";
const char* msg1 = "invalid column name";
const char* msg2 = "only support order by primary timestamp and queried column";
const char* msg3 = "only support order by primary timestamp and first tag in groupby clause";
const char* msg2 = "only support order by primary timestamp or queried column";
const char* msg3 = "only support order by primary timestamp or first tag in groupby clause";
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -6103,7 +6151,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
// set interval value
if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
} else {
if ((pQueryInfo->interval.interval > 0) &&
......@@ -6311,7 +6359,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
// set interval value
if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
} else {
if ((pQueryInfo->interval.interval > 0) &&
......@@ -197,28 +197,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
return pTableMeta;
* the TableMeta data format in memory is as follows:
* +--------------------+
* |STableMeta Body data| sizeof(STableMeta)
* +--------------------+
* |Schema data | numOfTotalColumns * sizeof(SSchema)
* +--------------------+
* |Tags data | tag_col_1.bytes + tag_col_2.bytes + ....
* +--------------------+
* @param pTableMeta
* @return
char* tsGetTagsValue(STableMeta* pTableMeta) {
int32_t offset = 0;
// int32_t numOfTotalCols = pTableMeta->numOfColumns + pTableMeta->numOfTags;
// uint32_t offset = sizeof(STableMeta) + numOfTotalCols * sizeof(SSchema);
return ((char*)pTableMeta + offset);
// todo refactor
UNUSED_FUNC static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
......@@ -878,37 +878,20 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// compressed ts block
pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
int32_t tsLen = 0;
int32_t numOfBlocks = 0;
if (pQueryInfo->tsBuf != NULL) {
int32_t vnodeId = htonl(pQueryMsg->head.vgId);
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId);
assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent
// todo refactor
if (fseek(pQueryInfo->tsBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
tscError("%p: fseek failed: %s", pSql, tstrerror(code));
// note: here used the index instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
size_t s = fread(pMsg, 1, pBlockInfo->compLen, pQueryInfo->tsBuf->f);
if (s != pBlockInfo->compLen) {
int code = TAOS_SYSTEM_ERROR(ferror(pQueryInfo->tsBuf->f));
tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
return code;
pMsg += pQueryMsg->tsLen;
pMsg += pBlockInfo->compLen;
tsLen = pBlockInfo->compLen;
numOfBlocks = pBlockInfo->numOfBlocks;
pQueryMsg->tsLen = htonl(tsLen);
pQueryMsg->tsNumOfBlocks = htonl(numOfBlocks);
if (pQueryInfo->tsBuf != NULL) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
pQueryMsg->tsLen = htonl(pQueryMsg->tsLen);
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
......@@ -32,11 +32,26 @@ typedef struct SInsertSupporter {
static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
static bool tsCompare(int32_t order, int64_t left, int64_t right) {
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
if (left == right) {
return 0;
if (order == TSDB_ORDER_ASC) {
return left < right;
return left < right? -1:1;
} else {
return left > right;
return left > right? -1:1;
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
while (tsBufNextPos(pTSBuf)) {
STSElem el1 = tsBufGetElem(pTSBuf);
int32_t res = tVariantCompare(el1.tag, tag1);
if (res != 0) { // it is a record with new tag
......@@ -51,10 +66,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order;
SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
pSubQueryInfo1->tsBuf = output1;
pSubQueryInfo2->tsBuf = output2;
......@@ -88,59 +103,74 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
int64_t numOfInput1 = 1;
int64_t numOfInput2 = 1;
while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
while(1) {
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
#ifdef _DEBUG_VIEW
tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key);
// no data in pSupporter1 anymore, jump out of loop
if (!tsBufIsValidElem(&elem)) {
int32_t res = tVariantCompare(elem1.tag, elem2.tag);
if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
// find the data in supporter2 with the same tag value
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
} else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
* there are elements in pSupporter2 with the same tag, continue
tVariant tag1 = {0};
tVariantAssign(&tag1, elem.tag);
} else {
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client.
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (win->skey > elem1.ts) {
win->skey = elem1.ts;
if (win->ekey < elem1.ts) {
win->ekey = elem1.ts;
if (tsBufIsValidElem(&e2)) {
while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
// data with current are exhausted
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
skipRemainValue(pSupporter1->pTSBuf, &tag1);
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondary merge of in the client.
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
if (re < 0) {
} else if (re > 0) {
} else {
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (win->skey > elem1.ts) {
win->skey = elem1.ts;
if (win->ekey < elem1.ts) {
win->ekey = elem1.ts;
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;//offset apply to projection?
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
} else { // no data in pSupporter2, ignore current data in pSupporter2
skipRemainValue(pSupporter1->pTSBuf, &tag1);
......@@ -162,8 +192,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st);
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey,
tsBufGetNumOfVnodes(output1), et - st);
return output1->numOfTotal;
......@@ -248,6 +279,68 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
return false;
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
// The virtual node, of which all tables are disqualified after the timestamp intersection,
// is removed to avoid next stage query.
// TODO: If tables from some vnodes are not qualified for next stage query, discard them.
for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);
bool found = false;
for (int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
if (!found) {
tscRemoveVgroupTableGroup(pVgroupTables, k);
} else {
assert(taosArrayGetSize(pVgroupTables) > 0);
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
size_t numOfGroups = taosArrayGetSize(pVgroupTables);
SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));
SVgroupTableInfo info;
for (int32_t i = 0; i < num; ++i) {
int32_t vnodeId = list[i];
for (int32_t j = 0; j < numOfGroups; ++j) {
SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
if (p1->vgInfo.vgId == vnodeId) {
tscVgroupTableCopy(&info, p1);
taosArrayPush(pNew, &info);
return pNew;
* launch secondary stage query to fetch the result that contains timestamp in set
......@@ -322,12 +415,11 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
pSupporter->exprList = NULL;
......@@ -341,7 +433,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
* during the timestamp intersection.
pSupporter->limit = pQueryInfo->limit;
pNewQueryInfo->limit = pSupporter->limit;
pQueryInfo->limit = pSupporter->limit;
SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);
......@@ -356,7 +448,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelectClause(pNew, 0);
pExpr = tscSqlExprGet(pQueryInfo, 0);
......@@ -371,39 +463,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pExpr->numOfParams = 1;
int32_t num = 0;
int32_t *list = NULL;
tsBufGetVnodeIdList(pNewQueryInfo->tsBuf, &num, &list);
if (pTableMetaInfo->pVgroupTables != NULL) {
for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k);
bool found = false;
for(int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
if (!found) {
tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k);
} else {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pVgroupTables != NULL);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
pTableMetaInfo->pVgroupTables = p;
} else {
filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
//prepare the subqueries object failed, abort
......@@ -517,17 +591,29 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
vgTables = taosArrayInit(4, sizeof(STableIdInfo));
info.itemList = vgTables;
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
tscDebug("%p vgId:%d, tables:%"PRId64, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
taosArrayPush(result, &info);
tscDebug("%p tid:%d, uid:%"PRIu64",vgId:%d added for vnode query", pSql, tt->tid, tt->uid, tt->vgId)
STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
taosArrayPush(vgTables, &item);
tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId);
prev = tt;
pTableMetaInfo->pVgroupTables = result;
pTableMetaInfo->vgroupIndex = 0;
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
tscDebug("%p vgId:%d, tables:%"PRId64, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
......@@ -602,11 +688,11 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscDebug("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);
// sort according to the tag value
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
......@@ -655,6 +741,19 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
qsort((*s1)->pData, t1, size, tidTagsCompar);
qsort((*s2)->pData, t2, size, tidTagsCompar);
#if 0
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s1)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s2)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
tscDebug("%p tags match complete, result: %"PRId64", %"PRId64, pParentSql, t1, t2);
......@@ -958,6 +1057,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
assert(pState->numOfRemain > 0);
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
......@@ -971,6 +1071,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// update the records for each subquery in parent sql object.
bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
for (int32_t i = 0; i < pState->numOfSub; ++i) {
if (pParentSql->pSubs[i] == NULL) {
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
......@@ -984,7 +1085,10 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pRes1->numOfRows, pRes1->numOfTotal);
assert(pRes1->row < pRes1->numOfRows);
} else {
pRes1->numOfClauseTotal += pRes1->numOfRows;
if (!stableQuery) {
pRes1->numOfClauseTotal += pRes1->numOfRows;
tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
......@@ -994,7 +1098,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0;
......@@ -1056,11 +1160,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (numOfFetch <= 0) {
bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0];
SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0);
bool orderedPrjQuery = false;
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
if (orderedPrjQuery) {
// get the number of subquery that need to retrieve the next vnode.
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) {
if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
......@@ -1164,7 +1279,6 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
// the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) {
......@@ -1260,21 +1374,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// wait for the other subqueries response from vnode
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
......@@ -338,34 +338,6 @@ void tscFreeSqlResult(SSqlObj* pSql) {
memset(&pSql->res, 0, sizeof(SSqlRes));
void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
// pSql->sqlstr will be used by tscBuildQueryStreamDesc
// if (pObj->signature == pObj) {
// }
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscResetSqlCmdObj(pCmd, false);
static void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) {
......@@ -434,22 +406,32 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("%p start to free sqlObj", pSql);
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
pSql->signature = NULL;
pSql->fp = NULL;
SSqlCmd* pCmd = &pSql->cmd;
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscResetSqlCmdObj(pCmd, false);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
pCmd->allocSize = 0;
......@@ -1714,6 +1696,17 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
taosArrayRemove(pVgroupTable, index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
info->itemList = taosArrayClone(pInfo->itemList);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
......@@ -1725,14 +1718,8 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
SVgroupTableInfo info;
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
memset(&info, 0, sizeof(SVgroupTableInfo));
info.vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
tscVgroupTableCopy(&info, pInfo);
info.itemList = taosArrayClone(pInfo->itemList);
taosArrayPush(pa, &info);
......@@ -51,6 +51,7 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
extern int32_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
extern char tsTempDir[];
//query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
......@@ -58,6 +58,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
int32_t tsEnableCoreFile = 0;
int32_t tsMaxBinaryDisplayWidth = 30;
char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/";
* denote if the server needs to compress response message at the application layer to client, including query rsp,
......@@ -1310,6 +1311,16 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "tempDir";
cfg.ptr = tsTempDir;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsTempDir);
cfg.unitType = TAOS_CFG_UTYPE_NONE;
void taosInitGlobalCfg() {
......@@ -108,7 +108,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
pVar->pz = calloc(len, sizeof(char));
pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
......@@ -20,9 +20,12 @@
extern "C" {
int32_t dnodeInitVnodeWrite();
void dnodeCleanupVnodeWrite();
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg);
int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus
......@@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
......@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL;
int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
......@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
......@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) {
if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
if (taosRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
......@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
void *dnodeAllocateVnodeRqueue(void *pVnode) {
void *dnodeAllocVReadQueue(void *pVnode) {
taos_queue queue = taosOpenQueue();
if (queue == NULL) {
......@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
return queue;
void dnodeFreeVnodeRqueue(void *rqueue) {
void dnodeFreeVReadQueue(void *rqueue) {
// dynamically adjust the number of threads
......@@ -15,74 +15,65 @@
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tutil.h"
#include "tglobal.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tsync.h"
#include "vnode.h"
#include "dnodeInt.h"
#include "syncInt.h"
#include "dnodeVWrite.h"
#include "dnodeMgmt.h"
#include "dnodeInt.h"
typedef struct {
taos_qall qall;
taos_qset qset; // queue set
pthread_t thread; // thread
int32_t workerId; // worker ID
taos_qall qall;
taos_qset qset; // queue set
int32_t workerId; // worker ID
pthread_t thread; // thread
} SWriteWorker;
typedef struct {
SRspRet rspRet;
int32_t processedCount;
int32_t code;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
SRspRet rspRet;
SRpcMsg rpcMsg;
int32_t processedCount;
int32_t code;
int32_t contLen;
void * pCont;
} SWriteMsg;
typedef struct {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker;
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *worker;
pthread_mutex_t mutex;
} SWriteWorkerPool;
static SWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
SWriteWorkerPool wWorkerPool;
int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL);
int32_t dnodeInitVnodeWrite() {
wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
pthread_mutex_init(&wWorkerPool.mutex, NULL);
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i;
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
tsVWriteWP.worker[i].workerId = i;
dInfo("dnode write is initialized, max worker %d", wWorkerPool.max);
dInfo("dnode vwrite is initialized, max worker %d", tsVWriteWP.max);
return 0;
void dnodeCleanupVnodeWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
......@@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() {
dInfo("dnode write is closed");
dInfo("dnode vwrite is closed");
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
char *pCont = (char *)pMsg->pCont;
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) {
char *pCont = pMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
......@@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) {
// put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
......@@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
void *dnodeAllocateVnodeWqueue(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
void *dnodeAllocVWriteQueue(void *pVnode) {
SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue();
if (queue == NULL) {
return NULL;
......@@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) {
return NULL;
......@@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qall == NULL) {
return NULL;
pthread_attr_t thAttr;
......@@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
dError("failed to create thread to process vwrite queue since %s", strerror(errno));
queue = NULL;
} else {
dDebug("write worker:%d is launched", pWorker->workerId);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
dDebug("dnode vwrite worker:%d is launched", pWorker->workerId);
tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
} else {
taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
dDebug("pVnode:%p, dnode vwrite queue:%p is allocated", pVnode, queue);
return queue;
void dnodeFreeVnodeWqueue(void *wqueue) {
void dnodeFreeVWriteQueue(void *wqueue) {
// dynamically adjust the number of threads
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
SWriteMsg *pWrite = (SWriteMsg *)param;
if (pWrite == NULL) return;
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return;
SWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
......@@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param;
SWriteMsg * pWrite;
SWalHead * pHead;
int32_t numOfMsgs;
int type;
void * pVnode, *item;
SRspRet * pRspRet;
void * pVnode;
void * pItem;
int32_t numOfMsgs;
int32_t qtype;
dDebug("write worker:%d is running", pWorker->workerId);
dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) {
dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
dDebug("qset:%p, dnode vwrite got no message from qset, exiting", pWorker->qset);
for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL;
pRspRet = NULL;
taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item;
taosGetQitem(pWorker->qall, &qtype, &pItem);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
} else if (type == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char*)item + sizeof(SSyncHead));
} else if (qtype == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
} else {
pHead = (SWalHead *)item;
pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
pHead->version, tstrerror(code));
......@@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item;
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
} else if (type == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item;
taosGetQitem(pWorker->qall, &qtype, &pItem);
if (qtype == TAOS_QTYPE_RPC) {
pWrite = pItem;
dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
} else if (qtype == TAOS_QTYPE_FWD) {
pHead = pItem;
vnodeConfirmForward(pVnode, pHead->version, 0);
} else {
......@@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL;
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) {
} else {
pWorker->qset = NULL;
dDebug("write worker:%d is released", pWorker->workerId);
......@@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue();
......@@ -51,6 +51,7 @@ void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *);
......@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
*p++ = c;
if (c == ';') {
if (c == ';' && quote == 0) {
c = *p;
*p = 0;
if (shellRunSingleCommand(con, cmd) < 0) {
......@@ -239,6 +239,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code
......@@ -52,9 +52,10 @@ extern "C" {
#include "osWindows.h"
#include "osDef.h"
#include "osAlloc.h"
#include "osAtomic.h"
#include "osCommon.h"
#include "osDef.h"
#include "osDir.h"
#include "osFile.h"
#include "osLz4.h"
......@@ -13,25 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifdef __cplusplus
extern "C" {
#define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size)
#define tcalloc(nmemb, size) calloc(nmemb, size)
#define trealloc(p, size) realloc(p, size)
#define tmemalign(alignment, size) malloc(size)
#define tfree(p) free(p)
#define tmemzero(p, size) memset(p, 0, size)
void *tmalloc(int32_t size);
void *tcalloc(int32_t size);
void *tcalloc(int32_t nmemb, int32_t size);
void *trealloc(void *p, int32_t size);
void *tmemalign(int32_t alignment, int32_t size);
void tfree(void *p);
......@@ -72,8 +72,6 @@ extern "C" {
#include <sys/utsname.h>
#define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define tsem_t dispatch_semaphore_t
......@@ -20,46 +20,52 @@
extern "C" {
ssize_t taosTReadImp(int fd, void *buf, size_t count);
ssize_t taosTWriteImp(int fd, void *buf, size_t count);
#define tread(fd, buf, count) read(fd, buf, count)
#define twrite(fd, buf, count) write(fd, buf, count)
#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
#define tclose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
} \
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size);
int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count);
int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath);
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define taosFSendFile(outfile, infile, offset, count) taosTSendFileImp(fileno(outfile), fileno(infile), offset, size)
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
#define taosClose(x) tclose(x)
#define taosTRead(fd, buf, count) taosTReadImp(fd, buf, count)
#define taosTWrite(fd, buf, count) taosTWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) lseek(fd, offset, whence)
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
void taosSetRandomFileFailFactor(int factor);
void taosSetRandomFileFailFactor(int32_t factor);
void taosSetRandomFileFailOutput(const char *path);
ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line);
#undef taosTRead
#undef taosTWrite
int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line);
#undef taosRead
#undef taosWrite
#undef taosLSeek
#define taosTRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosTWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath);
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath);
#define taosFtruncate ftruncate
int32_t taosFtruncate(int32_t fd, int64_t length);
#ifdef __cplusplus
......@@ -33,21 +33,19 @@ extern "C" {
} \
typedef int SOCKET;
typedef int32_t SOCKET;
#define taosClose(x) taosCloseSocket(x)
ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags);
ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count);
ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count);
int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags);
int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen);
int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count);
int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
......@@ -60,14 +58,14 @@ extern "C" {
int taosSetNonblocking(SOCKET sock, int on);
void taosBlockSIGPIPE();
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosBlockSIGPIPE();
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen);
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
uint32_t taosInetAddr(char *ipAddr);
uint32_t taosInetAddr(char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
#ifdef __cplusplus
......@@ -62,11 +62,8 @@ extern "C" {
#define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
extern int taosFtruncate(int fd, int64_t length);
#define SWAP(a, b, c) \
......@@ -139,7 +136,6 @@ typedef int (*__compar_fn_t)(const void *, const void *);
#define in_addr_t unsigned long
#define socklen_t int
#define htobe64 htonll
#define twrite write
#define getpid _getpid
struct tm *localtime_r(const time_t *timep, struct tm *result);
......@@ -19,21 +19,19 @@
#define _SEND_FILE_STEP_ 1000
int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) {
int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0);
int writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = { 0 };
int writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = {0};
for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) {
return writeLen;
else if (rlen < _SEND_FILE_STEP_) {
} else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file);
return (int)(writeLen + rlen);
else {
} else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
writeLen += _SEND_FILE_STEP_;
......@@ -44,8 +42,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
size_t rlen = fread(buffer, 1, remain, in_file);
if (rlen <= 0) {
return writeLen;
else {
} else {
fwrite(buffer, 1, remain, out_file);
writeLen += remain;
......@@ -54,7 +51,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return writeLen;
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
uError("not implemented yet");
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) {
uError("taosSendFile not implemented yet");
return -1;
\ No newline at end of file
......@@ -17,10 +17,10 @@
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "talloc.h"
#include "osAlloc.h"
void *tmalloc(int32_t size) {
void *p = malloc(size);
......@@ -32,11 +32,11 @@ void *tmalloc(int32_t size) {
return p;
void *tcalloc(int32_t size) {
void *p = calloc(1, size);
void *tcalloc(int32_t nmemb, int32_t size) {
void *p = calloc(nmemb, size);
if (p == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno));
uError("failed to calloc memory, nmemb:%d size:%d reason:%s", nmemb, size, strerror(errno));
return p;
......@@ -20,7 +20,7 @@
ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) {
int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
return -1;
......@@ -29,8 +29,8 @@ ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) {
return send(sockfd, buf, len, flags);
ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr,
socklen_t addrlen) {
int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags,
const struct sockaddr *dest_addr, socklen_t addrlen) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
return -1;
......@@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags,
return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) {
int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
return -1;
......@@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) {
return read(fd, buf, count);
ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) {
int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = EINTR;
return -1;
......@@ -61,10 +61,10 @@ ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) {
static int random_file_fail_factor = 20;
static int32_t random_file_fail_factor = 20;
static FILE *fpRandomFileFailOutput = NULL;
void taosSetRandomFileFailFactor(int factor) {
void taosSetRandomFileFailFactor(int32_t factor) {
random_file_fail_factor = factor;
......@@ -77,7 +77,7 @@ static void close_random_file_fail_output() {
static void random_file_fail_output_sig(int sig) {
static void random_file_fail_output_sig(int32_t sig) {
fprintf(fpRandomFileFailOutput, "signal %d received.\n", sig);
struct sigaction act = {0};
......@@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) {
sigaction(SIGILL, &act, NULL);
ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line) {
int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) {
errno = EIO;
......@@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file
return taosTReadImp(fd, buf, count);
return taosReadImp(fd, buf, count);
ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line) {
int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) {
errno = EIO;
......@@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *fil
return taosTWriteImp(fd, buf, count);
return taosWriteImp(fd, buf, count);
off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line) {
int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) {
errno = EIO;
......@@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, ui
return lseek(fd, offset, whence);
return taosLSeekImp(fd, offset, whence);
......@@ -15,16 +15,22 @@
#include "os.h"
#include "tglobal.h"
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char *tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX];
char *tmpDir = "/tmp/";
int32_t len = strlen(tsTempDir);
memcpy(tmpPath, tsTempDir, len);
if (tmpPath[len - 1] != '/') {
tmpPath[len++] = '/';
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcpy(tmpPath + len, tdengineTmpFileNamePrefix);
if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) {
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%d-%s");
......@@ -34,10 +40,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
// rename file name
int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath) {
int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath) {
int32_t ts = taosGetTimestampSec();
char fname[PATH_MAX] = {0}; // max file name length must be less than 255
......@@ -46,12 +52,13 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
if (delimiterPos == NULL) return -1;
int32_t fileNameLen = 0;
if (suffix)
if (suffix) {
fileNameLen = snprintf(fname, PATH_MAX, "%s.%d.%s", delimiterPos + 1, ts, suffix);
} else {
fileNameLen = snprintf(fname, PATH_MAX, "%s.%d", delimiterPos + 1, ts);
size_t len = (size_t)((delimiterPos - fullPath) + fileNameLen + 1);
int32_t len = (int32_t)((delimiterPos - fullPath) + fileNameLen + 1);
if (*dstPath == NULL) {
*dstPath = calloc(1, len + 1);
if (*dstPath == NULL) return -1;
......@@ -64,9 +71,9 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return rename(fullPath, *dstPath);
ssize_t taosTReadImp(int fd, void *buf, size_t count) {
size_t leftbytes = count;
ssize_t readbytes;
int64_t taosReadImp(int32_t fd, void *buf, int64_t count) {
int64_t leftbytes = count;
int64_t readbytes;
char * tbuf = (char *)buf;
while (leftbytes > 0) {
......@@ -78,19 +85,19 @@ ssize_t taosTReadImp(int fd, void *buf, size_t count) {
return -1;
} else if (readbytes == 0) {
return (ssize_t)(count - leftbytes);
return (int64_t)(count - leftbytes);
leftbytes -= readbytes;
tbuf += readbytes;
return (ssize_t)count;
return count;
ssize_t taosTWriteImp(int fd, void *buf, size_t n) {
size_t nleft = n;
ssize_t nwritten = 0;
int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) {
int64_t nleft = n;
int64_t nwritten = 0;
char * tbuf = (char *)buf;
while (nleft > 0) {
......@@ -105,13 +112,18 @@ ssize_t taosTWriteImp(int fd, void *buf, size_t n) {
tbuf += nwritten;
return (ssize_t)n;
return n;
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
return (int64_t)tlseek(fd, (long)offset, whence);
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
size_t leftbytes = size;
ssize_t sentbytes;
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) {
int64_t leftbytes = size;
int64_t sentbytes;
while (leftbytes > 0) {
......@@ -126,7 +138,7 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return -1;
} else if (sentbytes == 0) {
return (ssize_t)(size - leftbytes);
return (int64_t)(size - leftbytes);
leftbytes -= sentbytes;
......@@ -134,4 +146,17 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return size;
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size) {
return taosSendFile(fileno(outfile), fileno(infile), offset, size);
int32_t taosFtruncate(int32_t fd, int64_t length) {
return ftruncate(fd, length);
\ No newline at end of file
......@@ -19,8 +19,8 @@
int taosSetNonblocking(SOCKET sock, int on) {
int flags = 0;
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
int32_t flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
......@@ -43,7 +43,7 @@ void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigaddset(&signal_mask, SIGPIPE);
int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
uError("failed to block SIGPIPE");
......@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() {
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
......@@ -46,5 +46,16 @@ void osInit() {
strcpy(tsDnodeDir, "");
strcpy(tsMnodeDir, "");
strcpy(tsOsName, "Windows");
const char *tmpDir = getenv("tmp");
if (tmpDir != NULL) {
tmpDir = getenv("temp");
if (tmpDir != NULL) {
strcpy(tsTempDir, tmpDir);
} else {
strcpy(tsTempDir, "C:\\Windows\\Temp");
......@@ -16,17 +16,20 @@
#include "os.h"
#include "tulog.h"
#include "tglobal.h"
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX];
char tmpPath[PATH_MAX];
char *tmpDir = getenv("tmp");
if (tmpDir == NULL) {
tmpDir = "";
int32_t len = (int32_t)strlen(tsTempDir);
memcpy(tmpPath, tsTempDir, len);
if (tmpPath[len - 1] != '/' && tmpPath[len - 1] != '\\') {
tmpPath[len++] = '\\';
strcpy(tmpPath, tmpDir);
strcpy(tmpPath + len, tdengineTmpFileNamePrefix);
strcat(tmpPath, tdengineTmpFileNamePrefix);
if (strlen(tmpPath) + strlen(fileNamePrefix) + strlen("-%d-%s") < PATH_MAX) {
strcat(tmpPath, fileNamePrefix);
......@@ -40,19 +43,19 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#define _SEND_FILE_STEP_ 1000
int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) {
int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0);
int writeLen = 0;
int64_t writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = { 0 };
for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
for (int64_t len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) {
return writeLen;
else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file);
return (int)(writeLen + rlen);
return (int64_t)(writeLen + rlen);
else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
......@@ -60,7 +63,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
int remain = count - writeLen;
int64_t remain = count - writeLen;
if (remain > 0) {
size_t rlen = fread(buffer, 1, remain, in_file);
if (rlen <= 0) {
......@@ -75,12 +78,12 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return writeLen;
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
uError("taosTSendFileImp no implemented yet");
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) {
uError("taosSendFile no implemented yet");
return 0;
int taosFtruncate(int fd, int64_t length) {
int32_t taosFtruncate(int32_t fd, int64_t length) {
uError("taosFtruncate no implemented yet");
return 0;
\ No newline at end of file
......@@ -34,7 +34,7 @@ void taosWinSocketInit() {
int taosSetNonblocking(SOCKET sock, int on) {
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
u_long mode;
if (on) {
mode = 1;
......@@ -48,7 +48,7 @@ int taosSetNonblocking(SOCKET sock, int on) {
void taosBlockSIGPIPE() {}
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0;
......@@ -72,7 +72,7 @@ int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int op
uint32_t taosInetAddr(char *ipAddr) {
uint32_t value;
int ret = inet_pton(AF_INET, ipAddr, &value);
int32_t ret = inet_pton(AF_INET, ipAddr, &value);
if (ret <= 0) {
} else {
......@@ -40,6 +40,19 @@ typedef struct SGroupResInfo {
int32_t rowId;
} SGroupResInfo;
typedef struct SResultRowPool {
int32_t elemSize;
int32_t blockSize;
int32_t numOfElemPerBlock;
struct {
int32_t blockIndex;
int32_t pos;
} position;
SArray* pData; // SArray<void*>
} SResultRowPool;
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
......@@ -48,14 +61,14 @@ typedef struct SSqlGroupbyExpr {
int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr;
typedef struct SWindowResult {
typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:15;
bool closed:1; // this result status: closed or opened
uint16_t numOfRows; // number of rows of current time window
SResultInfo* resultInfo; // For each result column, there is a resultInfo
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window
} SWindowResult;
} SResultRow;
* If the number of generated results is greater than this value,
......@@ -69,16 +82,14 @@ typedef struct SResultRec {
} SResultRec;
typedef struct SWindowResInfo {
SWindowResult* pResult; // result list
SHashObj* hashList; // hash list for quick access
int16_t type; // data type for hash key
SResultRow** pResult; // result list
int16_t type:8; // data type for hash key
int32_t size:24; // number of result set
int32_t threshold; // threshold to halt query and return the generated results.
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int32_t size; // number of result set
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
int64_t threshold; // threshold to halt query and return the generated results.
int64_t interval; // time window interval
} SWindowResInfo;
typedef struct SColumnFilterElem {
......@@ -94,7 +105,7 @@ typedef struct SSingleColumnFilterInfo {
SColumnFilterElem* pFilters;
} SSingleColumnFilterInfo;
typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
typedef struct STableQueryInfo {
TSKEY lastKey;
int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
......@@ -122,7 +133,9 @@ typedef struct SQueryCostInfo {
uint32_t discardBlocks;
uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t internalSupSize;
uint64_t winInfoSize;
uint64_t tableInfoSize;
uint64_t hashSize;
uint64_t numOfTimeWindows;
} SQueryCostInfo;
......@@ -155,11 +168,11 @@ typedef struct SQuery {
typedef struct SQueryRuntimeEnv {
jmp_buf env;
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SResultRow* pResultRow; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
SQLFunctionCtx* pCtx;
int32_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
SWindowResInfo windowResInfo;
......@@ -175,6 +188,11 @@ typedef struct SQueryRuntimeEnv {
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info
} SQueryRuntimeEnv;
enum {
......@@ -71,7 +71,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void* taosDestoryFillInfo(SFillInfo *pFillInfo);
void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
......@@ -223,13 +223,6 @@ typedef struct tSQLExprList {
tSQLExprItem *a; /* One entry for each expression */
} tSQLExprList;
typedef struct tSQLExprListList {
int32_t nList; /* Number of expressions on the list */
int32_t nAlloc; /* Number of entries allocated below */
tSQLExprList **a; /* one entry for each row */
} tSQLExprListList;
* @param yyp The parser
......@@ -135,6 +135,12 @@ int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf);
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
int32_t dumpFileBlockByVnodeId(STSBuf* pTSBuf, int32_t vnodeId, void* buf, int32_t* len, int32_t* numOfBlocks);
STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, tVariant* pTag);
bool tsBufIsValidElem(STSElem* pElem);
#ifdef __cplusplus
......@@ -15,13 +15,22 @@
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \
*(uint64_t *)(_k) = (_uid); \
memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0)
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow);
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type);
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
......@@ -33,9 +42,9 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot];
return pWindowResInfo->pResult[slot];
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
......@@ -43,9 +52,9 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize);
int32_t initResultRow(SResultRow *pResultRow);
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult,
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult,
tFilePage* page) {
assert(pResult != NULL && pRuntimeEnv != NULL);
......@@ -62,4 +71,14 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval);
__filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type);
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv);
SResultRowPool* initResultRowPool(size_t size);
SResultRow* getNewResultRow(SResultRowPool* p);
int64_t getResultRowPoolMemSize(SResultRowPool* p);
void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
......@@ -145,15 +145,14 @@ typedef struct SInterpInfoDetail {
int8_t primaryCol;
} SInterpInfoDetail;
typedef struct SResultInfo {
typedef struct SResultRowCellInfo {
int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized
bool complete; // query has completed
bool superTableQ; // is super table query
uint32_t bufLen; // buffer size
uint64_t numOfRes; // num of output result in current buffer
void* interResultBuf; // output result buffer
} SResultInfo;
bool initialized; // output buffer has been initialized
bool complete; // query has completed
uint16_t numOfRes; // num of output result in current buffer
} SResultRowCellInfo;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
struct SQLFunctionCtx;
......@@ -175,9 +174,11 @@ typedef struct SQLFunctionCtx {
int16_t inputBytes;
int16_t outputType;
int16_t outputBytes; // size of results, determined by function and input column data type
bool hasNull; // null value exist in current block
int16_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
bool hasNull; // null value exist in current block
bool requireNull; // require null in some function
bool stableQuery;
int16_t functionId; // function id
void * aInputElemBuf;
char * aOutputBuf; // final result output buffer, point to sdata->data
......@@ -189,7 +190,8 @@ typedef struct SQLFunctionCtx {
void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SQLPreAggVal preAggVals;
tVariant tag;
SResultInfo *resultInfo;
SResultRowCellInfo *resultInfo;
SExtTagsInfo tagInfo;
} SQLFunctionCtx;
......@@ -274,16 +276,16 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
(_r)->initialized = false; \
} while (0)
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, (size_t)bufLen);
#ifdef __cplusplus
......@@ -91,7 +91,7 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
pFillInfo->numOfTotal = 0;
void* taosDestoryFillInfo(SFillInfo* pFillInfo) {
void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) {
return NULL;
......@@ -407,14 +407,14 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, inmem size:%dbytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize, listNEles(pResultBuf->lruList) * pResultBuf->pageSize,
qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f",
pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0,
} else {
qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, no file created", pResultBuf->handle,
......@@ -472,13 +472,20 @@ void mergeIdenticalVnodeBufferTest() {
tsBufMerge(pTSBuf1, pTSBuf2);
EXPECT_EQ(pTSBuf1->numOfVnodes, 1);
EXPECT_EQ(pTSBuf1->numOfVnodes, 2);
EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num);
int32_t count = 0;
while (tsBufNextPos(pTSBuf1)) {
STSElem elem = tsBufGetElem(pTSBuf1);
EXPECT_EQ(elem.vnode, 12);
if (count++ < numOfTags * num) {
EXPECT_EQ(elem.vnode, 12);
} else {
EXPECT_EQ(elem.vnode, 77);
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts);
......@@ -149,7 +149,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
int sfd = open(name, O_RDONLY);
if (sfd < 0) break;
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
if (ret < 0) break;
......@@ -406,7 +406,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
int sfd = open(fname, O_RDONLY);
if (sfd < 0) break;
code = taosTSendFile(pPeer->syncFd, sfd, NULL, size);
code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
if (code < 0) break;
......@@ -13,10 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <regex.h>
#include <regex.h>
#include "os.h"
#include "talgo.h"
#include "tchecksum.h"
......@@ -428,7 +426,7 @@ int tsdbUpdateFileHeader(SFile *pFile) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
if (taosTWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -493,7 +491,7 @@ int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
return -1;
if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE,
......@@ -579,7 +579,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (taosTWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
if (taosWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname,
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -620,7 +620,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
goto _err;
if (taosTRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
if (taosRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -14,9 +14,7 @@
#include "os.h"
#include "talgo.h"
#include "tchecksum.h"
......@@ -335,7 +333,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
return -1;
if (taosTSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
if (taosSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -380,7 +378,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx->tid = pHelper->tableInfo.tid;
if (taosTWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
if (taosWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -432,7 +430,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(offset == pFile->info.size);
if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
if (taosWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -454,7 +452,7 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe
return -1;
if (taosTRead(pFile->fd, buffer, len) < len) {
if (taosRead(pFile->fd, buffer, len) < len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len,
......@@ -551,7 +549,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
return -1;
if (taosTRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
if (taosRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len,
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -608,7 +606,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
return -1;
if (taosTRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
if (taosRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -823,7 +821,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
// Write the whole block to file
if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname,
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -1210,7 +1208,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
return -1;
if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
if (taosRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname,
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -1325,7 +1323,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
if (taosTRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
if (taosRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -72,8 +72,8 @@ typedef struct STableCheckInfo {
STable* pTableObj;
SCompInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
int32_t chosen; // indicate which iterator should move forward
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
int8_t chosen:2; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator
......@@ -265,7 +265,7 @@ void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...)
buffer[len] = 0;
if (pNote->taosNoteFd >= 0) {
taosTWrite(pNote->taosNoteFd, buffer, (unsigned int)len);
taosWrite(pNote->taosNoteFd, buffer, (unsigned int)len);
if (pNote->taosNoteMaxLines > 0) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册