“d7ae2bdef307b995ec9b8427c4d668f819645dcd”上不存在“projects/dockpanelsuite”
提交 c7d35784 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-22694

...@@ -273,22 +273,16 @@ password: taosdata ...@@ -273,22 +273,16 @@ password: taosdata
## Start the TDengine cluster with docker-compose ## Start the TDengine cluster with docker-compose
1. The following docker-compose file starts a TDengine cluster with two replicas, two management nodes, two data nodes, and one arbitrator. 1. The following docker-compose file starts a TDengine cluster with three nodes.
```docker ```yml
version: "3" version: "3"
services: services:
arbitrator:
image: tdengine/tdengine:$VERSION
command: tarbitrator
td-1: td-1:
image: tdengine/tdengine:$VERSION image: tdengine/tdengine:$VERSION
environment: environment:
TAOS_FQDN: "td-1" TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1" TAOS_FIRST_EP: "td-1"
TAOS_NUM_OF_MNODES: "2"
TAOS_REPLICA: "2"
TAOS_ARBITRATOR: arbitrator:6042
volumes: volumes:
- taosdata-td1:/var/lib/taos/ - taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/ - taoslog-td1:/var/log/taos/
...@@ -297,25 +291,30 @@ password: taosdata ...@@ -297,25 +291,30 @@ password: taosdata
environment: environment:
TAOS_FQDN: "td-2" TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1" TAOS_FIRST_EP: "td-1"
TAOS_NUM_OF_MNODES: "2"
TAOS_REPLICA: "2"
TAOS_ARBITRATOR: arbitrator:6042
volumes: volumes:
- taosdata-td2:/var/lib/taos/ - taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/ - taoslog-td2:/var/log/taos/
td-3:
image: tdengine/tdengine:$VERSION
environment:
TAOS_FQDN: "td-3"
TAOS_FIRST_EP: "td-1"
volumes: volumes:
- taosdata-td3:/var/lib/taos/
- taoslog-td3:/var/log/taos/
volumes:
taosdata-td1: taosdata-td1:
taoslog-td1: taoslog-td1:
taosdata-td2: taosdata-td2:
taoslog-td2: taoslog-td2:
``` taosdata-td3:
taoslog-td3:
```
:::note :::note
- The `VERSION` environment variable is used to set the tdengine image tag - The `VERSION` environment variable is used to set the tdengine image tag
- `TAOS_FIRST_EP` must be set on the newly created instance so that it can join the TDengine cluster; if there is a high availability requirement, `TAOS_SECOND_EP` needs to be used at the same time - `TAOS_FIRST_EP` must be set on the newly created instance so that it can join the TDengine cluster; if there is a high availability requirement, `TAOS_SECOND_EP` needs to be used at the same time
- `TAOS_REPLICA` is used to set the default number of database replicas. Its value range is [1,3]
We recommend setting it with `TAOS_ARBITRATOR` to use arbitrator in a two-nodes environment.
::: :::
2. Start the cluster 2. Start the cluster
...@@ -345,17 +344,18 @@ password: taosdata ...@@ -345,17 +344,18 @@ password: taosdata
4. Show dnodes via TDengine CLI 4. Show dnodes via TDengine CLI
```shell ```shell
$ docker-compose exec td-1 taos -s "show dnodes" $ docker-compose exec td-1 taos -s "show dnodes"
taos> show dnodes taos> show dnodes
id | end_point | vnodes | cores | status | role | create_time | offline reason | id | endpoint | vnodes | support_vnodes | status | create_time | note |
====================================================================================================================================== ======================================================================================================================================
1 | td-1:6030 | 1 | 8 | ready | any | 2022-01-18 02:47:42.871 | | 1 | td-1:6030 | 0 | 32 | ready | 2022-08-19 07:57:29.971 | |
2 | td-2:6030 | 0 | 8 | ready | any | 2022-01-18 02:47:43.518 | | 2 | td-2:6030 | 0 | 32 | ready | 2022-08-19 07:57:31.415 | |
0 | arbitrator:6042 | 0 | 0 | ready | arb | 2022-01-18 02:47:43.633 | - | 3 | td-3:6030 | 0 | 32 | ready | 2022-08-19 07:57:31.417 | |
Query OK, 3 row(s) in set (0.000811s) Query OK, 3 rows in database (0.021262s)
```
```
## taosAdapter ## taosAdapter
...@@ -373,19 +373,13 @@ password: taosdata ...@@ -373,19 +373,13 @@ password: taosdata
Suppose you want to deploy multiple taosAdapters to improve throughput and provide high availability. In that case, the recommended configuration method uses a reverse proxy such as Nginx to offer a unified access entry. For specific configuration methods, please refer to the official documentation of Nginx. Here is an example: Suppose you want to deploy multiple taosAdapters to improve throughput and provide high availability. In that case, the recommended configuration method uses a reverse proxy such as Nginx to offer a unified access entry. For specific configuration methods, please refer to the official documentation of Nginx. Here is an example:
```docker ```yml
version: "3" version: "3"
networks: networks:
inter: inter:
api:
services: services:
arbitrator:
image: tdengine/tdengine:$VERSION
command: tarbitrator
networks:
- inter
td-1: td-1:
image: tdengine/tdengine:$VERSION image: tdengine/tdengine:$VERSION
networks: networks:
...@@ -393,9 +387,6 @@ password: taosdata ...@@ -393,9 +387,6 @@ password: taosdata
environment: environment:
TAOS_FQDN: "td-1" TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1" TAOS_FIRST_EP: "td-1"
TAOS_NUM_OF_MNODES: "2"
TAOS_REPLICA: "2"
TAOS_ARBITRATOR: arbitrator:6042
volumes: volumes:
- taosdata-td1:/var/lib/taos/ - taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/ - taoslog-td1:/var/log/taos/
...@@ -406,15 +397,12 @@ password: taosdata ...@@ -406,15 +397,12 @@ password: taosdata
environment: environment:
TAOS_FQDN: "td-2" TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1" TAOS_FIRST_EP: "td-1"
TAOS_NUM_OF_MNODES: "2"
TAOS_REPLICA: "2"
TAOS_ARBITRATOR: arbitrator:6042
volumes: volumes:
- taosdata-td2:/var/lib/taos/ - taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/ - taoslog-td2:/var/log/taos/
adapter: adapter:
image: tdengine/tdengine:$VERSION image: tdengine/tdengine:$VERSION
command: taosadapter entrypoint: "taosadapter"
networks: networks:
- inter - inter
environment: environment:
...@@ -428,7 +416,6 @@ password: taosdata ...@@ -428,7 +416,6 @@ password: taosdata
- adapter - adapter
networks: networks:
- inter - inter
- api
ports: ports:
- 6041:6041 - 6041:6041
- 6044:6044/udp - 6044:6044/udp
...@@ -444,12 +431,12 @@ password: taosdata ...@@ -444,12 +431,12 @@ password: taosdata
>> /etc/nginx/nginx.conf;cat /etc/nginx/nginx.conf; >> /etc/nginx/nginx.conf;cat /etc/nginx/nginx.conf;
nginx -g 'daemon off;'", nginx -g 'daemon off;'",
] ]
volumes: volumes:
taosdata-td1: taosdata-td1:
taoslog-td1: taoslog-td1:
taosdata-td2: taosdata-td2:
taoslog-td2: taoslog-td2:
``` ```
## Deploy with docker swarm ## Deploy with docker swarm
......
...@@ -92,7 +92,7 @@ int main(int argc, char *argv[]) ...@@ -92,7 +92,7 @@ int main(int argc, char *argv[])
} }
// a simple way to parse input parameters // a simple way to parse input parameters
if (argc >= 3) strcpy(db, argv[2]); if (argc >= 3) strncpy(db, argv[2], sizeof(db) - 1);
if (argc >= 4) points = atoi(argv[3]); if (argc >= 4) points = atoi(argv[3]);
if (argc >= 5) numOfTables = atoi(argv[4]); if (argc >= 5) numOfTables = atoi(argv[4]);
......
...@@ -106,7 +106,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData); ...@@ -106,7 +106,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
// SRow ================================ // SRow ================================
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow); int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);
void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow); void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP); void tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
......
...@@ -172,8 +172,8 @@ enum { ...@@ -172,8 +172,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
...@@ -175,20 +175,24 @@ typedef struct { ...@@ -175,20 +175,24 @@ typedef struct {
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes); bool streamQueueResEmpty(const SStreamQueueRes* pRes);
int64_t streamQueueResSize(const SStreamQueueRes* pRes); int64_t streamQueueResSize(const SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes); SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes); SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes);
void streamQueueResClear(SStreamQueueRes* pRes); void streamQueueResClear(SStreamQueueRes* pRes);
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode); SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode);
#endif
typedef struct { typedef struct {
SStreamQueueNode* pHead; SStreamQueueNode* pHead;
} SStreamQueue1; } SStreamQueue1;
#if 0
bool streamQueueHasTask(const SStreamQueue1* pQueue); bool streamQueueHasTask(const SStreamQueue1* pQueue);
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif
typedef struct { typedef struct {
STaosQueue* queue; STaosQueue* queue;
...@@ -636,7 +640,7 @@ void streamMetaClose(SStreamMeta* streamMeta); ...@@ -636,7 +640,7 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); // SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
......
...@@ -119,6 +119,7 @@ int32_t* taosGetErrno(); ...@@ -119,6 +119,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //
#define TSDB_CODE_IVLD_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132) //
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
......
此差异已折叠。
version: "3"
networks:
inter:
api:
services:
arbitrator:
image: tdengine/tdengine:$VERSION
command: tarbitrator
networks:
- inter
td-1:
image: tdengine/tdengine:$VERSION
networks:
- inter
environment:
TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1"
TAOS_NUM_OF_MNODES: "2"
TAOS_REPLICA: "2"
TAOS_ARBITRATOR: arbitrator:6042
volumes:
- taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/
td-2:
image: tdengine/tdengine:$VERSION
networks:
- inter
environment:
TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1"
TAOS_NUM_OF_MNODES: "2"
TAOS_REPLICA: "2"
TAOS_ARBITRATOR: arbitrator:6042
volumes:
- taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/
adapter:
image: tdengine/tdengine:$VERSION
command: taosadapter
networks:
- inter
environment:
TAOS_FIRST_EP: "td-1"
TOAS_SECOND_EP: "td-2"
deploy:
replicas: 4
update_config:
parallelism: 4
nginx:
image: nginx
depends_on:
- adapter
networks:
- inter
- api
ports:
- 6041:6041
- 6044:6044/udp
command: [
"sh",
"-c",
"while true;
do curl -s http://adapter:6041/-/ping >/dev/null && break;
done;
printf 'server{listen 6041;location /{proxy_pass http://adapter:6041;}}'
> /etc/nginx/conf.d/rest.conf;
printf 'stream{server{listen 6044 udp;proxy_pass adapter:6044;}}'
>> /etc/nginx/nginx.conf;cat /etc/nginx/nginx.conf;
nginx -g 'daemon off;'",
]
volumes:
taosdata-td1:
taoslog-td1:
taosdata-td2:
taoslog-td2:
...@@ -73,7 +73,7 @@ fi ...@@ -73,7 +73,7 @@ fi
username="tdengine" username="tdengine"
# generate docker verison # generate docker version
echo "generate ${dockerim}:${version}" echo "generate ${dockerim}:${version}"
docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version} docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version}
docker manifest inspect ${dockerim}:${version} docker manifest inspect ${dockerim}:${version}
......
...@@ -74,7 +74,7 @@ do ...@@ -74,7 +74,7 @@ do
done done
# Check_verison() # Check_version()
# { # {
# } # }
...@@ -102,14 +102,14 @@ scriptDir=$(dirname $(readlink -f $0)) ...@@ -102,14 +102,14 @@ scriptDir=$(dirname $(readlink -f $0))
communityDir=${scriptDir}/../../../community communityDir=${scriptDir}/../../../community
DockerfilePath=${communityDir}/packaging/docker/ DockerfilePath=${communityDir}/packaging/docker/
if [ "$cloudBuild" == "y" ]; then if [ "$cloudBuild" == "y" ]; then
comunityArchiveDir=/nas/TDengine/v$version/cloud communityArchiveDir=/nas/TDengine/v$version/cloud
Dockerfile=${communityDir}/packaging/docker/DockerfileCloud Dockerfile=${communityDir}/packaging/docker/DockerfileCloud
else else
comunityArchiveDir=/nas/TDengine/v$version/community communityArchiveDir=/nas/TDengine/v$version/community
Dockerfile=${communityDir}/packaging/docker/Dockerfile Dockerfile=${communityDir}/packaging/docker/Dockerfile
fi fi
cd ${scriptDir} cd ${scriptDir}
cp -f ${comunityArchiveDir}/${pkgFile} . cp -f ${communityArchiveDir}/${pkgFile} .
echo "dirName=${dirName}" echo "dirName=${dirName}"
......
...@@ -456,12 +456,13 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { ...@@ -456,12 +456,13 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
blockDataDestroy(pBlock);
if(len != rspSize - sizeof(SRetrieveTableRsp)){ if(len != rspSize - sizeof(SRetrieveTableRsp)){
uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len, (uint64_t) (rspSize - sizeof(SRetrieveTableRsp))); uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len, (uint64_t) (rspSize - sizeof(SRetrieveTableRsp)));
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -139,7 +139,10 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) { ...@@ -139,7 +139,10 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) {
nkv += tPutI16v(NULL, -pTColumn->colId); nkv += tPutI16v(NULL, -pTColumn->colId);
nIdx++; nIdx++;
} else { } else {
ASSERT(0); if (ASSERTS(0, "invalid input")) {
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
} }
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
...@@ -176,8 +179,10 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) { ...@@ -176,8 +179,10 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) {
ntp = sizeof(SRow) + BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntp; ntp = sizeof(SRow) + BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntp;
break; break;
default: default:
ASSERT(0); if (ASSERTS(0, "impossible")) {
break; code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
} }
if (maxIdx <= UINT8_MAX) { if (maxIdx <= UINT8_MAX) {
nkv = sizeof(SRow) + sizeof(SKVIdx) + nIdx + nkv; nkv = sizeof(SRow) + sizeof(SKVIdx) + nIdx + nkv;
...@@ -306,8 +311,10 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) { ...@@ -306,8 +311,10 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) {
pv = pf + pTSchema->flen; pv = pf + pTSchema->flen;
break; break;
default: default:
ASSERT(0); if (ASSERTS(0, "impossible")) {
break; code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
} }
if (pb) { if (pb) {
...@@ -370,7 +377,7 @@ _exit: ...@@ -370,7 +377,7 @@ _exit:
return code; return code;
} }
void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
ASSERT(iCol < pTSchema->numOfCols); ASSERT(iCol < pTSchema->numOfCols);
ASSERT(pRow->sver == pTSchema->version); ASSERT(pRow->sver == pTSchema->version);
...@@ -381,17 +388,17 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ...@@ -381,17 +388,17 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
pColVal->type = pTColumn->type; pColVal->type = pTColumn->type;
pColVal->flag = CV_FLAG_VALUE; pColVal->flag = CV_FLAG_VALUE;
memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY)); memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY));
return; return 0;
} }
if (pRow->flag == HAS_NONE) { if (pRow->flag == HAS_NONE) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return; return 0;
} }
if (pRow->flag == HAS_NULL) { if (pRow->flag == HAS_NULL) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type); *pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return; return 0;
} }
if (pRow->flag >> 4) { // KV Row if (pRow->flag >> 4) { // KV Row
...@@ -440,7 +447,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ...@@ -440,7 +447,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
memcpy(&pColVal->value.val, pData, pTColumn->bytes); memcpy(&pColVal->value.val, pData, pTColumn->bytes);
} }
} }
return; return 0;
} else if (TABS(cid) < pTColumn->colId) { } else if (TABS(cid) < pTColumn->colId) {
lidx = mid + 1; lidx = mid + 1;
} else { } else {
...@@ -492,16 +499,16 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ...@@ -492,16 +499,16 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
pv = pf + pTSchema->flen; pv = pf + pTSchema->flen;
break; break;
default: default:
ASSERT(0); ASSERTS(0, "invalid row format");
break; return TSDB_CODE_IVLD_DATA_FMT;
} }
if (bv == BIT_FLG_NONE) { if (bv == BIT_FLG_NONE) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return; return 0;
} else if (bv == BIT_FLG_NULL) { } else if (bv == BIT_FLG_NULL) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type); *pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return; return 0;
} }
pColVal->cid = pTColumn->colId; pColVal->cid = pTColumn->colId;
...@@ -520,6 +527,8 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ...@@ -520,6 +527,8 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
} }
} }
} }
return 0;
} }
void tRowDestroy(SRow *pRow) { void tRowDestroy(SRow *pRow) {
...@@ -710,7 +719,6 @@ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter) { ...@@ -710,7 +719,6 @@ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter) {
_exit: _exit:
if (code) { if (code) {
*ppIter = NULL; *ppIter = NULL;
if (pIter) taosMemoryFree(pIter);
} else { } else {
*ppIter = pIter; *ppIter = pIter;
} }
...@@ -929,8 +937,8 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * ...@@ -929,8 +937,8 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
pv = pf + pTSchema->flen; pv = pf + pTSchema->flen;
break; break;
default: default:
ASSERT(0); ASSERTS(0, "Invalid row flag");
break; return TSDB_CODE_IVLD_DATA_FMT;
} }
while (pColData) { while (pColData) {
...@@ -954,8 +962,8 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * ...@@ -954,8 +962,8 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
bv = GET_BIT2(pb, iTColumn - 1); bv = GET_BIT2(pb, iTColumn - 1);
break; break;
default: default:
ASSERT(0); ASSERTS(0, "Invalid row flag");
break; return TSDB_CODE_IVLD_DATA_FMT;
} }
if (bv == BIT_FLG_NONE) { if (bv == BIT_FLG_NONE) {
...@@ -1045,7 +1053,8 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo ...@@ -1045,7 +1053,8 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo
} else if (pRow->flag & KV_FLG_BIG) { } else if (pRow->flag & KV_FLG_BIG) {
pData = pv + ((uint32_t *)pKVIdx->idx)[iCol]; pData = pv + ((uint32_t *)pKVIdx->idx)[iCol];
} else { } else {
ASSERT(0); ASSERTS(0, "Invalid KV row format");
return TSDB_CODE_IVLD_DATA_FMT;
} }
int16_t cid; int16_t cid;
...@@ -1579,7 +1588,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData, ...@@ -1579,7 +1588,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData,
int32_t code = 0; int32_t code = 0;
if (IS_VAR_DATA_TYPE(pColData->type)) { if (IS_VAR_DATA_TYPE(pColData->type)) {
code = tRealloc((uint8_t **)(&pColData->aOffset), (pColData->nVal + 1) << 2); code = tRealloc((uint8_t **)(&pColData->aOffset), ((int64_t)(pColData->nVal + 1)) << 2);
if (code) goto _exit; if (code) goto _exit;
pColData->aOffset[pColData->nVal] = pColData->nData; pColData->aOffset[pColData->nVal] = pColData->nData;
...@@ -2347,35 +2356,25 @@ void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { ...@@ -2347,35 +2356,25 @@ void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
} }
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) { uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) {
uint8_t v;
switch (pColData->flag) { switch (pColData->flag) {
case HAS_NONE: case HAS_NONE:
v = 0; return 0;
break;
case HAS_NULL: case HAS_NULL:
v = 1; return 1;
break;
case (HAS_NULL | HAS_NONE): case (HAS_NULL | HAS_NONE):
v = GET_BIT1(pColData->pBitMap, iVal); return GET_BIT1(pColData->pBitMap, iVal);
break;
case HAS_VALUE: case HAS_VALUE:
v = 2; return 2;
break;
case (HAS_VALUE | HAS_NONE): case (HAS_VALUE | HAS_NONE):
v = GET_BIT1(pColData->pBitMap, iVal); return (GET_BIT1(pColData->pBitMap, iVal)) ? 2 : 0;
if (v) v = 2;
break;
case (HAS_VALUE | HAS_NULL): case (HAS_VALUE | HAS_NULL):
v = GET_BIT1(pColData->pBitMap, iVal) + 1; return GET_BIT1(pColData->pBitMap, iVal) + 1;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): case (HAS_VALUE | HAS_NULL | HAS_NONE):
v = GET_BIT2(pColData->pBitMap, iVal); return GET_BIT2(pColData->pBitMap, iVal);
break;
default: default:
ASSERT(0); ASSERTS(0, "not possible");
break; return 0;
} }
return v;
} }
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg) { int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg) {
......
...@@ -638,7 +638,7 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb ...@@ -638,7 +638,7 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
} }
int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) { int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
// impl later // impl later
int32_t code = 0; int32_t code = -1;
SStbObj newStb = {0}; SStbObj newStb = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
...@@ -670,6 +670,7 @@ _OVER: ...@@ -670,6 +670,7 @@ _OVER:
} }
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb) { static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1;
SIdxObj idxObj = {0}; SIdxObj idxObj = {0};
memcpy(idxObj.name, req->idxName, TSDB_TABLE_FNAME_LEN); memcpy(idxObj.name, req->idxName, TSDB_TABLE_FNAME_LEN);
memcpy(idxObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN); memcpy(idxObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
...@@ -681,21 +682,6 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re ...@@ -681,21 +682,6 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re
idxObj.stbUid = pStb->uid; idxObj.stbUid = pStb->uid;
idxObj.dbUid = pStb->dbUid; idxObj.dbUid = pStb->dbUid;
int32_t code = -1;
// SField *pField0 = NULL;
// SStbObj stbObj = {0};
// SStbObj *pNew = &stbObj;
// taosRLockLatch(&pOld->lock);
// memcpy(&stbObj, pOld, sizeof(SStbObj));
// taosRUnLockLatch(&pOld->lock);
// stbObj.pColumns = NULL;
// stbObj.pTags = NULL;
// stbObj.updateTime = taosGetTimestampMs();
// stbObj.lock = 0;
int32_t tag = mndFindSuperTableTagId(pStb, req->colName); int32_t tag = mndFindSuperTableTagId(pStb, req->colName);
if (tag < 0) { if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST; terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
......
...@@ -133,6 +133,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { ...@@ -133,6 +133,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
} }
} }
#if 0
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec); void *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
...@@ -145,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { ...@@ -145,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
} }
#endif
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
mTrace("pullup telem msg"); mTrace("pullup telem msg");
......
...@@ -39,7 +39,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj ...@@ -39,7 +39,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); // static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/ /*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
...@@ -66,8 +66,8 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -66,8 +66,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
...@@ -778,6 +778,9 @@ _OVER: ...@@ -778,6 +778,9 @@ _OVER:
tFreeStreamObj(&streamObj); tFreeStreamObj(&streamObj);
return code; return code;
} }
#if 0
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -942,6 +945,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -942,6 +945,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return 0; return 0;
} }
#endif
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
......
...@@ -328,10 +328,6 @@ static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) ...@@ -328,10 +328,6 @@ static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData)
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
if (pData) {
taosMemoryFree(pData);
pData = NULL;
}
} }
*ppData = pData; *ppData = pData;
return code; return code;
...@@ -404,7 +400,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) ...@@ -404,7 +400,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData)
} }
while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) { while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) {
if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) < 0) { if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -1252,7 +1248,7 @@ static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, ...@@ -1252,7 +1248,7 @@ static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId,
SDelData delData; SDelData delData;
n += tGetDelData(pData + n, &delData); n += tGetDelData(pData + n, &delData);
if (taosArrayPush(pWriter->aDelData, &delData) < 0) { if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -1420,6 +1416,7 @@ _exit: ...@@ -1420,6 +1416,7 @@ _exit:
tBlockDataDestroy(&pWriter->bData); tBlockDataDestroy(&pWriter->bData);
tBlockDataDestroy(&pWriter->inData); tBlockDataDestroy(&pWriter->inData);
tsdbFSDestroy(&pWriter->fs); tsdbFSDestroy(&pWriter->fs);
taosMemoryFree(pWriter);
pWriter = NULL; pWriter = NULL;
} }
} else { } else {
......
...@@ -116,12 +116,7 @@ int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItem ...@@ -116,12 +116,7 @@ int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItem
} }
_exit: _exit:
if (code) {
*ppArray = NULL;
if (pArray) taosArrayDestroy(pArray);
} else {
*ppArray = pArray; *ppArray = pArray;
}
return code; return code;
} }
...@@ -1233,14 +1228,22 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, ...@@ -1233,14 +1228,22 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema,
int32_t iColumn = 1; int32_t iColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iColumn]; STColumn *pTColumn = &pTSchema->columns[iColumn];
for (int32_t iCid = 0; iCid < nCid; iCid++) { for (int32_t iCid = 0; iCid < nCid; iCid++) {
ASSERT(pTColumn); if (ASSERTS(pTColumn != NULL, "invalid input param")) {
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
while (pTColumn->colId < aCid[iCid]) { while (pTColumn->colId < aCid[iCid]) {
iColumn++; iColumn++;
ASSERT(iColumn < pTSchema->numOfCols); ASSERT(iColumn < pTSchema->numOfCols);
pTColumn = &pTSchema->columns[iColumn]; pTColumn = &pTSchema->columns[iColumn];
} }
ASSERT(pTColumn->colId == aCid[iCid]); if (ASSERTS(pTColumn->colId == aCid[iCid], "invalid input param")) {
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
tColDataInit(&pBlockData->aColData[iCid], pTColumn->colId, pTColumn->type, tColDataInit(&pBlockData->aColData[iCid], pTColumn->colId, pTColumn->type,
(pTColumn->flags & COL_SMA_ON) ? 1 : 0); (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
......
...@@ -31,7 +31,9 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 ...@@ -31,7 +31,9 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
int32_t affectedrows = 0; int32_t affectedrows = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
ASSERT(pTsdb->mem != NULL); if (ASSERTS(pTsdb->mem != NULL, "vgId:%d, mem is NULL", TD_VID(pTsdb->pVnode))) {
return -1;
}
if (pMsg) { if (pMsg) {
arrSize = taosArrayGetSize(pMsg->aSubmitTbData); arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
......
...@@ -36,8 +36,8 @@ static int32_t vnodeCompactTask(void *param) { ...@@ -36,8 +36,8 @@ static int32_t vnodeCompactTask(void *param) {
vnodeCommitInfo(dir); vnodeCommitInfo(dir);
_exit: _exit:
taosMemoryFree(pInfo);
tsem_post(&pInfo->pVnode->canCommit); tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
return code; return code;
} }
static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) { static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
...@@ -59,9 +59,17 @@ static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) { ...@@ -59,9 +59,17 @@ static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
} }
vnodeLoadInfo(dir, &info); if (vnodeLoadInfo(dir, &info) < 0) {
code = terrno;
goto _exit;
}
info.state.commitID = pInfo->commitID; info.state.commitID = pInfo->commitID;
vnodeSaveInfo(dir, &info);
if (vnodeSaveInfo(dir, &info) < 0) {
code = terrno;
goto _exit;
}
_exit: _exit:
if (code) { if (code) {
......
...@@ -48,7 +48,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -48,7 +48,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.state.applied = -1; info.state.applied = -1;
info.state.commitID = 0; info.state.commitID = 0;
vInfo("vgId:%d, save config while create", pCfg->vgId); vInfo("vgId:%d, save config while create", info.config.vgId);
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) { if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno)); vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
return -1; return -1;
...@@ -124,7 +124,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr ...@@ -124,7 +124,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
while (1) { while (1) {
const STfsFile *tsdbFile = tfsReaddir(tsdbDir); const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
if (tsdbFile == NULL) break; if (tsdbFile == NULL) break;
if (tsdbFile->rname == NULL) continue; if (tsdbFile->rname[0] == '\0') continue;
tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN); tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix); char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
......
...@@ -141,7 +141,10 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int ...@@ -141,7 +141,10 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
*(int64_t *)(pCoder->data + pCoder->pos) = uid; *(int64_t *)(pCoder->data + pCoder->pos) = uid;
pCoder->pos += sizeof(int64_t); pCoder->pos += sizeof(int64_t);
} else { } else {
tDecodeI64(pCoder, &submitTbData.uid); if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) { if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
...@@ -168,6 +171,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int ...@@ -168,6 +171,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
SColData colData = {0}; SColData colData = {0};
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
if (colData.flag != HAS_VALUE) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
for (int32_t iRow = 0; iRow < colData.nVal; iRow++) { for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) { if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
......
...@@ -493,11 +493,9 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt ...@@ -493,11 +493,9 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock); CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache); taosHashRelease(dbCache->tbCache, tbCache);
*pTb = NULL; *pTb = NULL;
}
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
ctx->tbInfo.tbType, dbFName); ctx->tbInfo.tbType, dbFName);
...@@ -1554,8 +1552,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ...@@ -1554,8 +1552,8 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
SCtgTbCache cache = {0}; SCtgTbCache cache = {0};
cache.pMeta = meta; cache.pMeta = meta;
if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) { if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
taosMemoryFree(meta);
ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
taosMemoryFree(meta);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
......
...@@ -5953,6 +5953,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol ...@@ -5953,6 +5953,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool hasPrimaryKey = false;
SNode* pCol = NULL; SNode* pCol = NULL;
SNode* pProj = NULL; SNode* pProj = NULL;
FORBOTH(pCol, pCols, pProj, *pProjections) { FORBOTH(pCol, pCols, pProj, *pProjections) {
...@@ -5966,6 +5967,14 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol ...@@ -5966,6 +5967,14 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList* pCol
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
} }
if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
hasPrimaryKey = true;
}
}
if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
"primary timestamp column can not be null");
} }
SNodeList* pNewProjections = NULL; SNodeList* pNewProjections = NULL;
...@@ -6008,7 +6017,15 @@ static int32_t adjustProjectionsForExistTable(STranslateContext* pCxt, SCreateSt ...@@ -6008,7 +6017,15 @@ static int32_t adjustProjectionsForExistTable(STranslateContext* pCxt, SCreateSt
return adjustOrderOfProjections(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList, pReq); return adjustOrderOfProjections(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList, pReq);
} }
static bool isGroupIdTagStream(const STableMeta* pMeta, SNodeList* pTags) {
return (NULL == pTags && 1 == pMeta->tableInfo.numOfTags && TSDB_DATA_TYPE_UBIGINT == getTableTagSchema(pMeta)->type);
}
static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pTags) { static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pTags) {
if (isGroupIdTagStream(pMeta, pTags)) {
return TSDB_CODE_SUCCESS;
}
if (getNumOfTags(pMeta) != LIST_LENGTH(pTags)) { if (getNumOfTags(pMeta) != LIST_LENGTH(pTags)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of tags"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of tags");
} }
......
...@@ -847,6 +847,9 @@ _return: ...@@ -847,6 +847,9 @@ _return:
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen); qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else {
qwFreeFetchRsp(rsp);
rsp = NULL;
} }
} }
...@@ -1217,7 +1220,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { ...@@ -1217,7 +1220,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} else { } else {
qwDropTask(QW_FPARAMS()); (void)qwDropTask(QW_FPARAMS());
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if 0
#include "streamInc.h" #include "streamInc.h"
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
...@@ -192,3 +193,4 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre ...@@ -192,3 +193,4 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre
// set status normal // set status normal
return 0; return 0;
} }
#endif
...@@ -26,7 +26,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ...@@ -26,7 +26,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, pRetrieve->data); blockDecode(pDataBlock, pRetrieve->data);
// TODO: refactor // TODO: refactor
......
...@@ -114,12 +114,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -114,12 +114,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
taosArrayDestroy(pRes);
return 0; return 0;
} }
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
taosArrayDestroy(pRes);
return -1; return -1;
} }
if (output == NULL) { if (output == NULL) {
......
...@@ -171,6 +171,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { ...@@ -171,6 +171,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
} }
#endif #endif
#if 0
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
...@@ -180,6 +181,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -180,6 +181,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
return NULL; return NULL;
} }
} }
#endif
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
......
...@@ -46,6 +46,7 @@ void streamQueueClose(SStreamQueue* queue) { ...@@ -46,6 +46,7 @@ void streamQueueClose(SStreamQueue* queue) {
taosMemoryFree(queue); taosMemoryFree(queue);
} }
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes) { bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
// //
return true; return true;
...@@ -101,3 +102,4 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { ...@@ -101,3 +102,4 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
if (pNode) return streamQueueBuildRes(pNode); if (pNode) return streamQueueBuildRes(pNode);
return (SStreamQueueRes){0}; return (SStreamQueueRes){0};
} }
#endif
...@@ -97,6 +97,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") ...@@ -97,6 +97,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_IVLD_DATA_FMT, "Invalid data format")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册