提交 4e30fac8 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

......@@ -12,12 +12,38 @@ TDengine 能够与开源数据可视化系统 [Grafana](https://www.grafana.com/
- TDengine 集群已经部署并正常运行
- taosAdapter 已经安装并正常运行。具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)
记录以下信息:
- TDengine 集群 REST API 地址,如:`http://tdengine.local:6041`。
- TDengine 集群认证信息,可使用用户名及密码。
## 安装 Grafana
目前 TDengine 支持 Grafana 7.0 以上的版本。用户可以根据当前的操作系统,到 Grafana 官网下载安装包,并执行安装。下载地址如下:<https://grafana.com/grafana/download>。
目前 TDengine 支持 Grafana 7.5 以上的版本。用户可以根据当前的操作系统,到 Grafana 官网下载安装包,并执行安装。下载地址如下:<https://grafana.com/grafana/download>。
## 配置 Grafana
### 安装方式一:安装脚本
将集群信息设置为环境变量(也可以使用 [`.env`(dotenv) 文件](https://hexdocs.pm/dotenvy/dotenv-file-format.html) ):
```sh
export TDENGINE_API=http://tdengine.local:6041
# user + password
export TDENGINE_USER=user
export TDENGINE_PASSWORD=password
```
运行安装脚本:
```sh
bash -c "$(curl -fsSL https://raw.githubusercontent.com/taosdata/grafanaplugin/master/install.sh)"
```
该脚本将自动安装 Grafana 插件并配置数据源。安装完毕后,需要重启 Grafana 服务后生效。
### 安装方式二:手动安装 TDengine 数据源插件
使用 [`grafana-cli` 命令行工具](https://grafana.com/docs/grafana/latest/administration/cli/) 进行插件[安装](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation)。
```bash
......@@ -48,11 +74,7 @@ sudo unzip tdengine-datasource-$GF_VERSION.zip -d /var/lib/grafana/plugins/
GF_INSTALL_PLUGINS=tdengine-datasource
```
## 使用 Grafana
### 配置数据源
用户可以直接通过 <http://localhost:3000> 的网址,登录 Grafana 服务器(用户名/密码:admin/admin),通过左侧 `Configuration -> Data Sources` 可以添加数据源,如下图所示:
之后,用户可以直接通过 <http://localhost:3000> 的网址,登录 Grafana 服务器(用户名/密码:admin/admin),通过左侧 `Configuration -> Data Sources` 可以添加数据源,如下图所示:
![TDengine Database Grafana plugin add data source](./add_datasource1.webp)
......@@ -92,4 +114,11 @@ GF_INSTALL_PLUGINS=tdengine-datasource
### 导入 Dashboard
在 2.3.3.0 及以上版本,您可以导入 TDinsight Dashboard (Grafana Dashboard ID: [15167](https://grafana.com/grafana/dashboards/15167)) 作为 TDengine 集群的监控可视化工具。安装和使用说明请见 [TDinsight 用户手册](/reference/tdinsight/)。
在数据源配置页面,您可以为该数据源导入 TDinsight 面板,作为 TDengine 集群的监控可视化工具。该 Dashboard 已发布在 Grafana:[Dashboard 15167 - TDinsight](https://grafana.com/grafana/dashboards/15167)) 。其他安装方式和相关使用说明请见 [TDinsight 用户手册](/reference/tdinsight/)。
使用 TDengine 作为数据源的其他面板,可以[在此搜索](https://grafana.com/grafana/dashboards/?dataSource=tdengine-datasource)。以下是一份不完全列表:
- [15146](https://grafana.com/grafana/dashboards/15146): 监控多个 TDengine 集群
- [15155](https://grafana.com/grafana/dashboards/15155): TDengine 告警示例
- [15167](https://grafana.com/grafana/dashboards/15167): TDinsight
- [16388](https://grafana.com/grafana/dashboards/16388): Telegraf 采集节点信息的数据展示
......@@ -14,12 +14,40 @@ In order for Grafana to add the TDengine data source successfully, the following
1. The TDengine cluster is deployed and functioning properly
2. taosAdapter is installed and running properly. Please refer to the taosAdapter manual for details.
Record these values:
- TDengine REST API url: `http://tdengine.local:6041`.
- TDengine cluster authorization, with user + password.
## Installing Grafana
TDengine currently supports Grafana versions 7.0 and above. Users can go to the Grafana official website to download the installation package and execute the installation according to the current operating system. The download address is as follows: <https://grafana.com/grafana/download>.
TDengine currently supports Grafana versions 7.5 and above. Users can go to the Grafana official website to download the installation package and execute the installation according to the current operating system. The download address is as follows: <https://grafana.com/grafana/download>.
## Configuring Grafana
### Option 1: Install with `install.sh`
Set the url and authorization environment variables by `export` or a [`.env`(dotenv) file](https://hexdocs.pm/dotenvy/dotenv-file-format.html):
```sh
export TDENGINE_API=http://tdengine.local:6041
# user + password
export TDENGINE_USER=user
export TDENGINE_PASSWORD=password
```
Run `install.sh`:
```sh
bash -c "$(curl -fsSL https://raw.githubusercontent.com/taosdata/grafanaplugin/master/install.sh)"
```
With this script, TDengine data source plugin and the Grafana data source will be installed and created automatically with Grafana provisioning configurations.
And then, restart Grafana service and open Grafana in web-browser, usually <http://localhost:3000>.
### Option 2: Install Plugin Manually
Follow the installation steps in [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) with the [``grafana-cli`` command-line tool](https://grafana.com/docs/grafana/latest/administration/cli/) for plugin installation.
```bash
......@@ -50,11 +78,7 @@ If Grafana is running in a Docker environment, the TDengine plugin can be automa
GF_INSTALL_PLUGINS=tdengine-datasource
```
## Using Grafana
### Configuring Data Sources
Users can log in to the Grafana server (username/password: admin/admin) directly through the URL `http://localhost:3000` and add a datasource through `Configuration -> Data Sources` on the left side, as shown in the following figure.
Now users can log in to the Grafana server (username/password: admin/admin) directly through the URL `http://localhost:3000` and add a datasource through `Configuration -> Data Sources` on the left side, as shown in the following figure.
![TDengine Database TDinsight plugin add datasource 1](./grafana/add_datasource1.webp)
......@@ -94,4 +118,11 @@ Follow the default prompt to query the average system memory usage for the speci
### Importing the Dashboard
In version 2.3.3.0 and above, you can import the TDinsight Dashboard (Grafana Dashboard ID: [15168](https://grafana.com/grafana/dashboards/15167)) as a monitoring visualization tool for TDengine clusters. You can find installation and usage instructions in the TDinsight User Manual (/reference/tdinsight/).
You can install TDinsight dashboard in data source configuration page (like `http://localhost:3000/datasources/edit/1/dashboards`) as a monitoring visualization tool for TDengine cluster. The dashboard is published in Grafana as [Dashboard 15167 - TDinsight](https://grafana.com/grafana/dashboards/15167). Check the [TDinsight User Manual](/reference/tdinsight/) for the details.
For more dashboards using TDengine data source, [search here in Grafana](https://grafana.com/grafana/dashboards/?dataSource=tdengine-datasource). Here is a sub list:
- [15146](https://grafana.com/grafana/dashboards/15146): Monitor multiple TDengine clusters.
- [15155](https://grafana.com/grafana/dashboards/15155): TDengine alert demo.
- [15167](https://grafana.com/grafana/dashboards/15167): TDinsight.
- [16388](https://grafana.com/grafana/dashboards/16388): Telegraf node metrics dashboard using TDengine data source.
......@@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param);
// Shuduo: temporary enable for app build
#if 1
......
......@@ -136,14 +136,14 @@ int indexRebuild(SIndex* index, SIndexOpts* opt);
* @param index (output, index json object)
* @return error code
*/
int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
int indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
/*
* close index
* @param index (input, index to be closed)
* @return void
*/
void tIndexJsonClose(SIndexJson* index);
void indexJsonClose(SIndexJson* index);
/*
* insert terms into index
......@@ -152,7 +152,7 @@ void tIndexJsonClose(SIndexJson* index);
* @param uid (input, uid of terms)
* @return error code
*/
int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
/*
* search index
* @param index (input, index object)
......@@ -161,7 +161,7 @@ int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
* @return error code
*/
int tIndexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
int indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
/*
* @param
* @param
......
......@@ -339,6 +339,7 @@ typedef struct {
int32_t sourceTaskId;
int32_t sourceVg;
int32_t sourceChildId;
int32_t upstreamNodeId;
#if 0
int64_t sourceVer;
#endif
......
......@@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo {
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
SRpcConnInfo connInfo;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
......
......@@ -171,6 +171,7 @@ typedef struct SReqResultInfo {
uint32_t current;
bool completed;
int32_t precision;
bool convertUcs4;
int32_t payloadLen;
} SReqResultInfo;
......@@ -222,7 +223,7 @@ typedef struct SSyncQueryParam {
SRequestObj* pRequest;
} SSyncQueryParam;
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
......
......@@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) {
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs();
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
......
......@@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
tsem_post(&pParam->sem);
}
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
......@@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
}
SSyncQueryParam* pParam = pRequest->body.param;
// convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4;
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem);
}
......
......@@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
#if SYNC_ON_TOP_OF_ASYNC
return doAsyncFetchRows(pRequest, true, true);
#else
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return NULL;
}
#if SYNC_ON_TOP_OF_ASYNC
return doAsyncFetchRow(pRequest, true, true);
#else
return doFetchRows(pRequest, true, true);
#endif
......@@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL) {
return 0;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
......@@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
}
#if SYNC_ON_TOP_OF_ASYNC
doAsyncFetchRow(pRequest, false, true);
doAsyncFetchRows(pRequest, false, true);
#else
doFetchRows(pRequest, true, true);
#endif
......@@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
return 0;
}
#if SYNC_ON_TOP_OF_ASYNC
doAsyncFetchRows(pRequest, false, false);
#else
doFetchRows(pRequest, false, false);
#endif
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
......@@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
}
if (pRequest->code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
return;
}
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false);
pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->code = code;
......@@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
}
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
taos_fetch_rows_a(res, fp, param);
}
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
void *param, int interval) {
// TODO
......
......@@ -494,7 +494,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp(tmq, true);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
/*tmq_commit(tmq, NULL, true);*/
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
......@@ -667,94 +666,6 @@ FAIL:
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
#if 0
// TODO: add read write lock
SRequestObj* pRequest = NULL;
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
// build msg
// send to mnode
SMqCMCommitOffsetReq req;
SArray* pOffsets = NULL;
if (offsets == NULL) {
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
strcpy(offset.topicName, pTopic->topicName);
strcpy(offset.cgroup, tmq->groupId);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pOffsets, &offset);
}
}
req.num = pOffsets->size;
req.offsets = pOffsets->pData;
} else {
req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->container.pData;
}
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tEncoderClear(&encoder);
return -1;
}
tEncoderClear(&encoder);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) {
tscError("failed to malloc request");
}
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
return -1;
}
pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0);
pParam->async = async;
pParam->offsets = pOffsets;
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
resp = pParam->rspErr;
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
if (pOffsets) {
taosArrayDestroy(pOffsets);
}
}
return resp;
#endif
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
......@@ -859,93 +770,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
conf->commitCbUserParam = param;
}
#if 0
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL;
char* astStr = NULL;
int32_t sqlLen;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || streamName == NULL || sql == NULL) {
tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
sqlLen = strlen(sql);
if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create stream: %s", streamName);
int32_t code = 0;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, streamName);
SCMCreateStreamReq req = {
.igExists = 1,
.ast = astStr,
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
strcpy(req.targetStbFullName, tbName);
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
goto _return;
}
tSerializeSCMCreateStreamReq(buf, tlen, &req);
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_STREAM;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
taosMemoryFreeClear(astStr);
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
#endif
#if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0;
......@@ -1540,10 +1364,11 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
return NULL;
}
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}
......@@ -684,7 +684,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
*/
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
// | total rows/total length | block group id | column schema | each column length |
return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) +
numOfCols * sizeof(int32_t);
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
......@@ -1893,12 +1894,12 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
uint64_t* groupId = (uint64_t*)data;
data += sizeof(uint64_t);
for(int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
*((int16_t*) data) = pColInfoData->info.type;
*((int16_t*)data) = pColInfoData->info.type;
data += sizeof(int16_t);
*((int32_t*) data) = pColInfoData->info.bytes;
*((int32_t*)data) = pColInfoData->info.bytes;
data += sizeof(int32_t);
}
......@@ -1944,6 +1945,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = numOfRows;
const char* pStart = pData;
int32_t dataLen = *(int32_t*)pStart;
......@@ -1952,13 +1955,25 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pBlock->info.groupId = *(uint64_t*)pStart;
pStart += sizeof(uint64_t);
for(int32_t i = 0; i < numOfCols; ++i) {
if (pBlock->pDataBlock == NULL) {
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArraySetSize(pBlock->pDataBlock, numOfCols);
}
pBlock->info.numOfCols = taosArrayGetSize(pBlock->pDataBlock);
ASSERT(pBlock->pDataBlock->size >= numOfCols);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
pColInfoData->info.type = *(int16_t*)pStart;
pStart += sizeof(int16_t);
pColInfoData->info.bytes = *(int32_t*)pStart;
pStart += sizeof(int32_t);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pBlock->info.hasVarCol = true;
}
}
blockDataEnsureCapacity(pBlock, numOfRows);
......@@ -1983,11 +1998,17 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
}
} else {
if (pColInfoData->nullbitmap == NULL) {
pColInfoData->nullbitmap = taosMemoryCalloc(1, BitmapLen(numOfRows));
}
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
pStart += BitmapLen(numOfRows);
}
if (colLen[i] > 0) {
if (pColInfoData->pData == NULL) {
pColInfoData->pData = taosMemoryCalloc(1, colLen[i]);
}
memcpy(pColInfoData->pData, pStart, colLen[i]);
}
......
......@@ -352,7 +352,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -22,17 +22,17 @@ static void dmSendRsp(SRpcMsg *pMsg);
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
return -1;
}
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
// if (IsReq(pRpc)) {
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
// return -1;
//}
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
memcpy(pMsg->conn.user, connInfo.user, TSDB_USER_LEN);
pMsg->conn.clientIp = connInfo.clientIp;
pMsg->conn.clientPort = connInfo.clientPort;
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
pMsg->conn.clientIp = pConnInfo->clientIp;
pMsg->conn.clientPort = pConnInfo->clientPort;
return 0;
}
......@@ -49,9 +49,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
SDnodeTrans * pTrans = &pDnode->trans;
int32_t code = -1;
SRpcMsg *pMsg = NULL;
SRpcMsg * pMsg = NULL;
SMgmtWrapper *pWrapper = NULL;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
......@@ -167,11 +167,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray *pArray = (*pWrapper->func.getHandlesFp)();
SArray * pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
......
......@@ -99,7 +99,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
indexMultiTermAdd(terms, term);
}
}
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms);
#endif
return 0;
......
......@@ -448,7 +448,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = pMsg->pCont;
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
......
......@@ -737,8 +737,8 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
STimeWindow win = {
.skey = INT64_MIN,
.ekey = INT64_MAX,
.skey = INT64_MIN,
.ekey = INT64_MAX,
};
bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
......@@ -846,7 +846,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
/*doClearBufferedBlocks(pInfo);*/
pOperator->status = OP_EXEC_DONE;
return NULL;
}
......
......@@ -131,7 +131,7 @@ typedef struct TFileCacheKey {
char* colName;
int32_t nColName;
} ICacheKey;
int indexFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int64_t indexAddRef(void* p);
int32_t indexRemoveRef(int64_t ref);
......
......@@ -455,7 +455,7 @@ static void idxDestroyFinalRslt(SArray* result) {
taosArrayDestroy(result);
}
int indexFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
if (sIdx == NULL) {
return -1;
}
......
......@@ -69,7 +69,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRsl
cacheSearchRange_JSON}};
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);
static bool idxCacheIteratorNext(Iterate* itera);
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
if (cache == NULL) {
......@@ -476,7 +476,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->val.colVal = NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = indexCacheIteratorNext;
iiter->next = idxCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue;
taosThreadMutexUnlock(&cache->mtx);
......@@ -748,9 +748,9 @@ static void doMergeWork(SSchedMsg* msg) {
int quit = msg->thandle ? true : false;
taosMemoryFree(msg->thandle);
indexFlushCacheToTFile(sidx, pCache, quit);
idxFlushCacheToTFile(sidx, pCache, quit);
}
static bool indexCacheIteratorNext(Iterate* itera) {
static bool idxCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) {
return false;
......
......@@ -355,7 +355,7 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
indexMultiTermQueryAdd(mtm, tm, qtype);
ret = tIndexJsonSearch(arg->ivtIdx, mtm, output->result);
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
} else {
bool reverse;
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
......
......@@ -15,11 +15,11 @@
#include "index.h"
#include "indexInt.h"
int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
int indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
// handle
return indexOpen(opts, path, index);
}
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
if (p->colType == TSDB_DATA_TYPE_BOOL) {
......@@ -36,7 +36,7 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
return indexPut(index, terms, uid);
}
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
SArray *terms = tq->query;
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
......@@ -54,7 +54,7 @@ int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *re
return indexSearch(index, tq, result);
}
void tIndexJsonClose(SIndexJson *index) {
void indexJsonClose(SIndexJson *index) {
// handle close
return indexClose(index);
}
......@@ -56,11 +56,11 @@ class JsonEnv : public ::testing::Test {
initLog();
opts = indexOptsCreate();
int ret = tIndexJsonOpen(opts, dir.c_str(), &index);
int ret = indexJsonOpen(opts, dir.c_str(), &index);
assert(ret == 0);
}
virtual void TearDown() {
tIndexJsonClose(index);
indexJsonClose(index);
indexOptsDestroy(opts);
printf("destory\n");
taosMsleep(1000);
......@@ -75,7 +75,7 @@ static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtyp
(const char*)data, dlen);
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, (int64_t)tableId);
indexJsonPut(index, terms, (int64_t)tableId);
indexMultiTermDestroy(terms);
}
......@@ -86,7 +86,7 @@ static void delData(SIndexJson* index, const std::string& colName, int8_t dtype,
(const char*)data, dlen);
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, (int64_t)tableId);
indexJsonPut(index, terms, (int64_t)tableId);
indexMultiTermDestroy(terms);
}
......@@ -99,7 +99,7 @@ static void Search(SIndexJson* index, const std::string& colNam, int8_t dtype, v
SArray* res = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, (EIndexQueryType)filterType);
tIndexJsonSearch(index, mq, res);
indexJsonSearch(index, mq, res);
indexMultiTermQueryDestroy(mq);
*result = res;
}
......@@ -112,7 +112,7 @@ TEST_F(JsonEnv, testWrite) {
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -125,7 +125,7 @@ TEST_F(JsonEnv, testWrite) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -138,7 +138,7 @@ TEST_F(JsonEnv, testWrite) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -152,7 +152,7 @@ TEST_F(JsonEnv, testWrite) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(100, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -167,7 +167,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -182,7 +182,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -196,7 +196,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -210,7 +210,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(10, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -225,7 +225,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -240,7 +240,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(10, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -258,7 +258,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -271,7 +271,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -284,7 +284,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -297,7 +297,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -310,7 +310,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -324,7 +324,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -339,7 +339,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -354,7 +354,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -369,7 +369,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -385,7 +385,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -398,7 +398,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -412,7 +412,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -426,7 +426,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -441,7 +441,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -455,7 +455,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -469,7 +469,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -483,7 +483,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -498,7 +498,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -511,7 +511,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i + 1000);
indexJsonPut(index, terms, i + 1000);
indexMultiTermDestroy(terms);
}
}
......@@ -526,7 +526,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(2000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......
......@@ -59,7 +59,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status;
pCont->streamId = pReq->streamId;
......@@ -78,7 +78,18 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
streamExec(pTask, pMsgCb);
if (pTask->execType != TASK_EXEC__NONE) {
streamExec(pTask, pMsgCb);
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
while (1) {
void* data = streamQueueNextItem(pTask->inputQueue);
if (data == NULL) return 0;
if (streamTaskOutput(pTask, data) < 0) {
ASSERT(0);
}
}
}
// 3. handle output
// 3.1 check and set status
......
......@@ -22,6 +22,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
......@@ -42,6 +43,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
......@@ -94,6 +96,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.sourceTaskId = pTask->taskId,
.sourceVg = data->sourceVg,
.sourceChildId = pTask->childId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
......@@ -184,13 +187,17 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
#endif
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) return 0;
if (pBlock == NULL) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
......
......@@ -65,6 +65,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
/*qRes->sourceVg = pTask->nodeId;*/
if (streamTaskOutput(pTask, qRes) < 0) {
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
......
......@@ -307,6 +307,13 @@ static void uvHandleReq(SSvrConn* pConn) {
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
}
// set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(refMgt, pConn->refId);
STrans* pTransInst = pConn->pTransInst;
......@@ -1153,23 +1160,6 @@ _return2:
rpcFreeCont(msg->pCont);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
if (thandle == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
return -1;
}
SExHandle* ex = thandle;
SSvrConn* pConn = ex->handle;
if (pConn == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
return -1;
}
struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
pInfo->clientPort = ntohs(addr.sin_port);
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
return 0;
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册