diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 0aa48ef2aa167318c37d92c7c92917ebc28a7f84..783193db49f00d60341134c3496dab8e6edbab7b 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -36,6 +36,7 @@ typedef struct SReadHandle {
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
+ int64_t version;
bool initMetaReader;
bool initTableReader;
bool initTqReader;
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 48fa2d79382f59aa058d9455f7dbc22af0ac03ad..e2d75d39e34c9077591101e636255027e4b672d7 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -264,15 +264,12 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
- if (code != 0) {
- taosMemoryFreeClear(param);
- return -1;
- }
-
char *key = (char *)param;
SClientHbBatchRsp pRsp = {0};
- tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
-
+ if (TSDB_CODE_SUCCESS == code) {
+ tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
+ }
+
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
taosThreadMutexLock(&appInfo.mutex);
@@ -288,6 +285,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosMemoryFreeClear(param);
+ if (code != 0) {
+ (*pInst)->onlineDnodes = 0;
+ }
+
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index 614174d97ae54acf64bb80d67c5dc3882409de04..03f3ef95c539b547cb441a8094443b4cdfb97334 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
@@ -23,7 +24,6 @@
#include "tqueue.h"
#include "tref.h"
#include "ttimer.h"
-#include "cJSON.h"
int32_t tmqAskEp(tmq_t* tmq, bool async);
@@ -106,8 +106,8 @@ struct tmq_t {
tsem_t rspSem;
};
-struct tmq_raw_data{
- void *raw_meta;
+struct tmq_raw_data {
+ void* raw_meta;
int32_t raw_meta_len;
int16_t raw_meta_type;
};
@@ -953,6 +953,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto FAIL;
}
+ tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
+
return pTmq;
FAIL:
@@ -1194,10 +1196,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
- char buf[50];
- tFormatOffset(buf, 50, &pVgCur->currentOffsetNew);
- tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId,
- vgKey, buf);
+ char buf[80];
+ tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
+ tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
+ pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
}
}
@@ -1564,7 +1566,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
- tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
+ tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
+ vgSkipCnt);
continue;
/*if (vgSkipCnt < 10000) continue;*/
#if 0
@@ -1620,8 +1623,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
- tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, tmq->consumerId,
- pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
+ tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
+ tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
@@ -1669,7 +1672,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
- /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
+ /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
+ * rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) {
@@ -1691,7 +1695,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
- /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
+ /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
+ * rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
@@ -1848,9 +1853,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL;
}
-tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) {
+tmq_raw_data* tmq_get_raw_meta(TAOS_RES* res) {
if (TD_RES_TMQ_META(res)) {
- tmq_raw_data *raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
+ tmq_raw_data* raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
@@ -1860,7 +1865,8 @@ tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) {
return NULL;
}
-static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
+static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
+ int8_t t) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
@@ -1870,31 +1876,31 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
cJSON_AddItemToObject(json, "type", type);
char uid[32] = {0};
- sprintf(uid, "%"PRIi64, id);
+ sprintf(uid, "%" PRIi64, id);
cJSON* id_ = cJSON_CreateString(uid);
cJSON_AddItemToObject(json, "id", id_);
cJSON* tableName = cJSON_CreateString(name);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
cJSON_AddItemToObject(json, "tableType", tableType);
-// cJSON* version = cJSON_CreateNumber(1);
-// cJSON_AddItemToObject(json, "version", version);
+ // cJSON* version = cJSON_CreateNumber(1);
+ // cJSON_AddItemToObject(json, "version", version);
cJSON* columns = cJSON_CreateArray();
- for(int i = 0; i < schemaRow->nCols; i++){
- cJSON* column = cJSON_CreateObject();
- SSchema *s = schemaRow->pSchema + i;
- cJSON* cname = cJSON_CreateString(s->name);
+ for (int i = 0; i < schemaRow->nCols; i++) {
+ cJSON* column = cJSON_CreateObject();
+ SSchema* s = schemaRow->pSchema + i;
+ cJSON* cname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(column, "name", cname);
cJSON* ctype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(column, "type", ctype);
- if(s->type == TSDB_DATA_TYPE_BINARY){
+ if (s->type == TSDB_DATA_TYPE_BINARY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
- }else if (s->type == TSDB_DATA_TYPE_NCHAR){
- int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
+ int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
}
cJSON_AddItemToArray(columns, column);
@@ -1902,20 +1908,20 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
cJSON_AddItemToObject(json, "columns", columns);
cJSON* tags = cJSON_CreateArray();
- for(int i = 0; schemaTag && i < schemaTag->nCols; i++){
- cJSON* tag = cJSON_CreateObject();
- SSchema *s = schemaTag->pSchema + i;
- cJSON* tname = cJSON_CreateString(s->name);
+ for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
+ cJSON* tag = cJSON_CreateObject();
+ SSchema* s = schemaTag->pSchema + i;
+ cJSON* tname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(tag, "type", ttype);
- if(s->type == TSDB_DATA_TYPE_BINARY){
+ if (s->type == TSDB_DATA_TYPE_BINARY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(tag, "length", cbytes);
- }else if (s->type == TSDB_DATA_TYPE_NCHAR){
- int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
+ int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(tag, "length", cbytes);
}
cJSON_AddItemToArray(tags, tag);
@@ -1927,13 +1933,13 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
return string;
}
-static char *processCreateStb(SMqMetaRsp *metaRsp){
+static char* processCreateStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
- char* string = NULL;
+ char* string = NULL;
// decode and process req
- void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
@@ -1949,7 +1955,7 @@ _err:
return string;
}
-static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){
+static char* buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
@@ -1958,7 +1964,7 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
char cid[32] = {0};
- sprintf(cid, "%"PRIi64, id);
+ sprintf(cid, "%" PRIi64, id);
cJSON* cid_ = cJSON_CreateString(cid);
cJSON_AddItemToObject(json, "id", cid_);
@@ -1968,19 +1974,19 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
cJSON_AddItemToObject(json, "tableType", tableType);
char sid_[32] = {0};
- sprintf(sid_, "%"PRIi64, sid);
+ sprintf(sid_, "%" PRIi64, sid);
cJSON* using = cJSON_CreateString(sid_);
cJSON_AddItemToObject(json, "using", using);
-// cJSON* version = cJSON_CreateNumber(1);
-// cJSON_AddItemToObject(json, "version", version);
+ // cJSON* version = cJSON_CreateNumber(1);
+ // cJSON_AddItemToObject(json, "version", version);
cJSON* tags = cJSON_CreateArray();
- if (tTagIsJson(pTag)) { // todo
+ if (tTagIsJson(pTag)) { // todo
char* pJson = parseTagDatatoJson(pTag);
cJSON* tag = cJSON_CreateObject();
- cJSON* tname = cJSON_CreateString("unknown"); // todo
+ cJSON* tname = cJSON_CreateString("unknown"); // todo
cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
cJSON_AddItemToObject(tag, "type", ttype);
@@ -1999,12 +2005,12 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
goto end;
}
- for(int i = 0; i < taosArrayGetSize(pTagVals); i++){
+ for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
cJSON* tag = cJSON_CreateObject();
-// cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
- cJSON* tname = cJSON_CreateString("unkonwn"); // todo
+ // cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
+ cJSON* tname = cJSON_CreateString("unkonwn"); // todo
cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
cJSON_AddItemToObject(tag, "type", ttype);
@@ -2032,13 +2038,13 @@ end:
return string;
}
-static char *processCreateTable(SMqMetaRsp *metaRsp){
+static char* processCreateTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVCreateTbBatchReq req = {0};
- SVCreateTbReq *pCreateReq;
- char *string = NULL;
+ SVCreateTbReq* pCreateReq;
+ char* string = NULL;
// decode
- void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
@@ -2048,27 +2054,29 @@ static char *processCreateTable(SMqMetaRsp *metaRsp){
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
- if(pCreateReq->type == TSDB_CHILD_TABLE){
- string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
- }else if(pCreateReq->type == TSDB_NORMAL_TABLE){
- string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
+ if (pCreateReq->type == TSDB_CHILD_TABLE) {
+ string =
+ buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
+ } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
+ string =
+ buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
}
}
tDecoderClear(&decoder);
- _exit:
+_exit:
tDecoderClear(&decoder);
return string;
}
-static char *processAlterTable(SMqMetaRsp *metaRsp){
- SDecoder decoder = {0};
- SVAlterTbReq vAlterTbReq = {0};
- char *string = NULL;
+static char* processAlterTable(SMqMetaRsp* metaRsp) {
+ SDecoder decoder = {0};
+ SVAlterTbReq vAlterTbReq = {0};
+ char* string = NULL;
// decode
- void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
@@ -2081,8 +2089,8 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
}
cJSON* type = cJSON_CreateString("alter");
cJSON_AddItemToObject(json, "type", type);
-// cJSON* uid = cJSON_CreateNumber(id);
-// cJSON_AddItemToObject(json, "uid", uid);
+ // cJSON* uid = cJSON_CreateNumber(id);
+ // cJSON_AddItemToObject(json, "uid", uid);
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
@@ -2097,43 +2105,43 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType);
- if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
+ if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
- }else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
- int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
+ int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
}
break;
}
- case TSDB_ALTER_TABLE_DROP_COLUMN:{
+ case TSDB_ALTER_TABLE_DROP_COLUMN: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
break;
}
- case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
+ case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType);
- if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
+ if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
- }else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
- int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
- cJSON* cbytes = cJSON_CreateNumber(length);
+ } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
+ int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
+ cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
}
break;
}
- case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
+ case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
@@ -2142,12 +2150,12 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
cJSON_AddItemToObject(json, "colNewName", colNewName);
break;
}
- case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{
+ case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
cJSON_AddItemToObject(json, "colName", tagName);
- cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo
+ cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo
cJSON_AddItemToObject(json, "colValue", colValue);
cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull);
cJSON_AddItemToObject(json, "colValueNull", isNull);
@@ -2158,18 +2166,18 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
}
string = cJSON_PrintUnformatted(json);
- _exit:
+_exit:
tDecoderClear(&decoder);
return string;
}
-static char *processDropSTable(SMqMetaRsp *metaRsp){
- SDecoder decoder = {0};
- SVDropStbReq req = {0};
- char *string = NULL;
+static char* processDropSTable(SMqMetaRsp* metaRsp) {
+ SDecoder decoder = {0};
+ SVDropStbReq req = {0};
+ char* string = NULL;
// decode
- void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
@@ -2183,7 +2191,7 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){
cJSON* type = cJSON_CreateString("drop");
cJSON_AddItemToObject(json, "type", type);
char uid[32] = {0};
- sprintf(uid, "%"PRIi64, req.suid);
+ sprintf(uid, "%" PRIi64, req.suid);
cJSON* id = cJSON_CreateString(uid);
cJSON_AddItemToObject(json, "id", id);
cJSON* tableName = cJSON_CreateString(req.name);
@@ -2193,18 +2201,18 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){
string = cJSON_PrintUnformatted(json);
- _exit:
+_exit:
tDecoderClear(&decoder);
return string;
}
-static char *processDropTable(SMqMetaRsp *metaRsp){
- SDecoder decoder = {0};
- SVDropTbBatchReq req = {0};
- char *string = NULL;
+static char* processDropTable(SMqMetaRsp* metaRsp) {
+ SDecoder decoder = {0};
+ SVDropTbBatchReq req = {0};
+ char* string = NULL;
// decode
- void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
@@ -2217,67 +2225,67 @@ static char *processDropTable(SMqMetaRsp *metaRsp){
}
cJSON* type = cJSON_CreateString("drop");
cJSON_AddItemToObject(json, "type", type);
-// cJSON* uid = cJSON_CreateNumber(id);
-// cJSON_AddItemToObject(json, "uid", uid);
-// cJSON* tableType = cJSON_CreateString("normal");
-// cJSON_AddItemToObject(json, "tableType", tableType);
+ // cJSON* uid = cJSON_CreateNumber(id);
+ // cJSON_AddItemToObject(json, "uid", uid);
+ // cJSON* tableType = cJSON_CreateString("normal");
+ // cJSON_AddItemToObject(json, "tableType", tableType);
cJSON* tableNameList = cJSON_CreateArray();
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
- cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
+ cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
cJSON_AddItemToArray(tableNameList, tableName);
}
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json);
- _exit:
+_exit:
tDecoderClear(&decoder);
return string;
}
-char *tmq_get_json_meta(TAOS_RES *res){
+char* tmq_get_json_meta(TAOS_RES* res) {
if (!TD_RES_TMQ_META(res)) {
return NULL;
}
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
- if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){
+ if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
return processCreateStb(&pMetaRspObj->metaRsp);
- }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){
+ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
return processCreateStb(&pMetaRspObj->metaRsp);
- }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){
+ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
return processDropSTable(&pMetaRspObj->metaRsp);
- }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){
+ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
return processCreateTable(&pMetaRspObj->metaRsp);
- }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){
+ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
return processAlterTable(&pMetaRspObj->metaRsp);
- }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){
+ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(&pMetaRspObj->metaRsp);
}
return NULL;
}
-static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
+static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
SVCreateStbReq req = {0};
SDecoder coder;
SMCreateStbReq pReq = {0};
- int32_t code = TSDB_CODE_SUCCESS;
- SRequestObj* pRequest = NULL;
+ int32_t code = TSDB_CODE_SUCCESS;
+ SRequestObj* pRequest = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
- if(!pRequest->pDb){
+ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
- void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
@@ -2286,16 +2294,16 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
}
// build create stable
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
- for(int32_t i = 0; i < req.schemaRow.nCols; i++){
+ for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
SSchema* pSchema = req.schemaRow.pSchema + i;
- SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
+ SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pColumns, &field);
}
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
- for(int32_t i = 0; i < req.schemaTag.nCols; i++){
+ for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
SSchema* pSchema = req.schemaTag.pSchema + i;
- SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
+ SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pTags, &field);
}
@@ -2323,7 +2331,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
}
tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
- SQuery pQuery = {0};
+ SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType;
@@ -2333,18 +2341,18 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
- end:
+end:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
return code;
}
-static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
+static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
SVDropStbReq req = {0};
SDecoder coder;
SMDropStbReq pReq = {0};
- int32_t code = TSDB_CODE_SUCCESS;
+ int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
@@ -2352,12 +2360,12 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
- if(!pRequest->pDb){
+ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
- void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
@@ -2386,7 +2394,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
}
tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
- SQuery pQuery = {0};
+ SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType;
@@ -2396,7 +2404,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
- end:
+end:
destroyRequest(pRequest);
tDecoderClear(&coder);
return code;
@@ -2409,7 +2417,7 @@ typedef struct SVgroupCreateTableBatch {
} SVgroupCreateTableBatch;
static void destroyCreateTbReqBatch(void* data) {
- SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data;
+ SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
@@ -2426,12 +2434,12 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
- if(!pRequest->pDb){
+ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
- void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
@@ -2456,15 +2464,15 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
- .requestId = pRequest->requestId,
- .requestObjRefId = pRequest->self,
- .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0};
- SName pName;
+ SName pName;
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
@@ -2496,7 +2504,7 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_CREATE_TABLE;
pQuery->stableQuery = false;
- pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
+ pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) {
@@ -2504,10 +2512,10 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
}
launchQueryImpl(pRequest, pQuery, false, NULL);
- pQuery = NULL; // no need to free in the end
- code = pRequest->code;
+ pQuery = NULL; // no need to free in the end
+ code = pRequest->code;
- end:
+end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
@@ -2539,12 +2547,12 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
- if(!pRequest->pDb){
+ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
- void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
@@ -2569,15 +2577,15 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
- .requestId = pRequest->requestId,
- .requestObjRefId = pRequest->self,
- .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0};
- SName pName;
+ SName pName;
toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
@@ -2607,7 +2615,7 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DROP_TABLE;
pQuery->stableQuery = false;
- pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
+ pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) {
@@ -2615,10 +2623,10 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
}
launchQueryImpl(pRequest, pQuery, false, NULL);
- pQuery = NULL; // no need to free in the end
- code = pRequest->code;
+ pQuery = NULL; // no need to free in the end
+ code = pRequest->code;
- end:
+end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
@@ -2641,12 +2649,12 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
- if(!pRequest->pDb){
+ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
- void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
+ void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
@@ -2655,7 +2663,7 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
}
// do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
- if(req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS){
+ if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
goto end;
}
@@ -2667,12 +2675,12 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
}
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
- .requestId = pRequest->requestId,
- .requestObjRefId = pRequest->self,
- .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0};
- SName pName = {0};
+ SName pName = {0};
toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
@@ -2706,7 +2714,7 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_ALTER_TABLE;
pQuery->stableQuery = false;
- pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
+ pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
if (code != TSDB_CODE_SUCCESS) {
@@ -2714,14 +2722,14 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
}
launchQueryImpl(pRequest, pQuery, false, NULL);
- pQuery = NULL; // no need to free in the end
+ pQuery = NULL; // no need to free in the end
pVgData = NULL;
- pArray = NULL;
- code = pRequest->code;
+ pArray = NULL;
+ code = pRequest->code;
end:
taosArrayDestroy(pArray);
- if(pVgData) taosMemoryFreeClear(pVgData->pData);
+ if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData);
destroyRequest(pRequest);
tDecoderClear(&coder);
@@ -2729,22 +2737,22 @@ end:
return code;
}
-int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){
+int32_t taos_write_raw_meta(TAOS* taos, tmq_raw_data* raw_meta) {
if (!taos || !raw_meta) {
return TSDB_CODE_INVALID_PARA;
}
- if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
+ if (raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
- }else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){
+ } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
- }else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){
+ } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_STB) {
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
- }else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){
+ } else if (raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE) {
return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
- }else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE){
+ } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE) {
return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
- }else if(raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE){
+ } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE) {
return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}
return TSDB_CODE_INVALID_PARA;
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index 224c1dbb5dcc9488123e0dcc75a14ea118a5054a..8ce22c2b2e663bbeaf735628de612af7f9fc5b01 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -45,6 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
+static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {
@@ -854,6 +855,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0};
+ bool isAlter = false;
if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@@ -889,6 +891,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
code = 0;
goto _OVER;
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
+ isAlter = true;
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
} else {
mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
@@ -929,7 +932,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER;
}
- code = mndCreateStb(pMnode, pReq, &createReq, pDb);
+ if (isAlter) {
+ bool needRsp = false;
+ code = mndAlterStbImp(pMnode, pReq, pDb, pStb, needRsp);
+ } else {
+ code = mndCreateStb(pMnode, pReq, &createReq, pDb);
+ }
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
@@ -1495,14 +1503,13 @@ static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *t
return code;
}
-static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
- int32_t *pLen) {
+static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
int32_t ret;
SEncoder ec = {0};
uint32_t contLen = 0;
SMAlterStbRsp alterRsp = {0};
SName name = {0};
- tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
+ tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == alterRsp.pMeta) {
@@ -1535,10 +1542,36 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
return 0;
}
+static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp) {
+ int32_t code = -1;
+ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
+ if (pTrans == NULL) goto _OVER;
+
+ mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
+ mndTransSetDbName(pTrans, pDb->name, NULL);
+
+ if (needRsp) {
+ void *pCont = NULL;
+ int32_t contLen = 0;
+ if (mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen) != 0) goto _OVER;
+ mndTransSetRpcRsp(pTrans, pCont, contLen);
+ }
+
+ if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
+ if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
+ if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
+ if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
+
+ code = 0;
+
+_OVER:
+ mndTransDrop(pTrans);
+ return code;
+}
+
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
bool needRsp = true;
int32_t code = -1;
- STrans *pTrans = NULL;
SField *pField0 = NULL;
SStbObj stbObj = {0};
@@ -1587,30 +1620,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
}
if (code != 0) goto _OVER;
-
- code = -1;
- pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
- if (pTrans == NULL) goto _OVER;
-
- mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
- mndTransSetDbName(pTrans, pDb->name, NULL);
-
- if (needRsp) {
- void *pCont = NULL;
- int32_t contLen = 0;
- if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
- mndTransSetRpcRsp(pTrans, pCont, contLen);
- }
-
- if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
- if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
- if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
- if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
-
- code = 0;
+ code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp);
_OVER:
- mndTransDrop(pTrans);
taosMemoryFreeClear(stbObj.pTags);
taosMemoryFreeClear(stbObj.pColumns);
return code;
diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h
index b2ea08d50de0410d691a11c8196f6db4083278bf..8abaac6dffb15a656d78cd4454001ae8d28b8277 100644
--- a/source/dnode/vnode/src/inc/tq.h
+++ b/source/dnode/vnode/src/inc/tq.h
@@ -89,6 +89,8 @@ typedef struct {
STqExecTb execTb;
STqExecDb execDb;
};
+ // TODO remove it
+ int64_t tsdbEndVer;
} STqExecHandle;
@@ -129,7 +131,7 @@ typedef struct {
static STqMgmt tqMgmt = {0};
// tqRead
-int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
+int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 0c72d35027e921a4614581850c95902cc8a38eae..ae0f7f56a2cccff84580453fc06e64e5650f73c6 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -112,7 +112,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
};
tmsgSendRsp(&resp);
- tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
+ tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64
+ ", rspOffset:%" PRId64,
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);
return 0;
@@ -179,8 +180,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
tDecoderClear(&decoder);
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) {
- tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, offset.subKey,
- TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
+ tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
+ offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
} else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version);
@@ -316,9 +317,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
// 3.query
- if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
- fetchOffsetNew.version++;
- if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
+ if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
+ if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
+ fetchOffsetNew.version++;
+ }
+ if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;
goto OVER;
@@ -333,7 +336,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER;
}
- if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
+ if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
@@ -411,6 +414,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
taosMemoryFree(pCkHead);
+#if 0
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
@@ -421,6 +425,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
+#endif
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);
}
@@ -478,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
/*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
+ int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg;
req.qmsg = NULL;
@@ -488,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
+ .version = ver,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
@@ -496,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader[i]);
+ pHandle->execHandle.tsdbEndVer = ver;
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) {
diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c
index 4ecd88b1fddf6613800348a2789b96145a7988d9..3ee274ced1733a6604d5d8720cabc40604d4d98e 100644
--- a/source/dnode/vnode/src/tq/tqExec.c
+++ b/source/dnode/vnode/src/tq/tqExec.c
@@ -59,15 +59,17 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0;
}
-int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
+int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan(task, pOffset) < 0) {
+ ASSERT(pOffset->type == TMQ_OFFSET__LOG);
pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--;
return 0;
}
+ int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
@@ -77,11 +79,26 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
+ pRsp->blockNum++;
if (pRsp->withTbName) {
- int64_t uid = pExec->pExecReader[0]->msgIter.uid;
- tqAddTbNameToRsp(pTq, uid, pRsp);
+ if (pOffset->type == TMQ_OFFSET__LOG) {
+ int64_t uid = pExec->pExecReader[0]->msgIter.uid;
+ tqAddTbNameToRsp(pTq, uid, pRsp);
+ } else {
+ pRsp->withTbName = 0;
+ }
}
- pRsp->blockNum++;
+ if (pOffset->type == TMQ_OFFSET__LOG) {
+ continue;
+ } else {
+ rowCnt += pDataBlock->info.rows;
+ if (rowCnt <= 4096) continue;
+ }
+ }
+
+ if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
+ tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1);
+ qStreamPrepareScan(task, pOffset);
continue;
}
@@ -94,18 +111,19 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
ASSERT(0);
}
- if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
+ ASSERT(pRsp->rspOffset.type != 0);
+
+ if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
}
- ASSERT(pRsp->rspOffset.type != 0);
-
break;
}
return 0;
}
+#if 0
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
@@ -153,6 +171,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
return 0;
}
+#endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index 8d0ae4785b0211977b3f2aecd84b8b7168477144..d49c307233a8d3a2cb3e272b0a53c1953726d4a0 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -478,7 +478,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
SVnode *pVnode = pFsm->data;
- int32_t code = vnodeSnapWriterClose(pWriter, isApply);
+ int32_t code = vnodeSnapWriterClose(pWriter, !isApply);
return code;
}
diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h
index 9d0e3871cc5d428fcb0497eafacd2bbe00271b3b..7637c66b8428adf2aea5826a2de1a8aa8f99aea5 100644
--- a/source/libs/catalog/inc/catalogInt.h
+++ b/source/libs/catalog/inc/catalogInt.h
@@ -166,7 +166,7 @@ typedef struct SCtgDBCache {
int8_t deleted;
SCtgVgCache vgCache;
SHashObj *tbCache; // key:tbname, value:SCtgTbCache
- SHashObj *stbCache; // key:suid, value:STableMeta*
+ SHashObj *stbCache; // key:suid, value:char*
} SCtgDBCache;
typedef struct SCtgRentSlot {
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 612677634f277d8a372c1a149a6964af1c9eb950..7cba2c9eaf2f148f008665b31ffbbf77ec8c1238 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -144,6 +144,7 @@ typedef struct {
void* metaBlk; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
+ int64_t lastScanUid;
} SStreamTaskInfo;
typedef struct SExecTaskInfo {
@@ -291,10 +292,12 @@ typedef struct STableScanInfo {
int32_t currentGroupId;
int32_t currentTable;
+#if 0
struct {
uint64_t uid;
int64_t ts;
} lastStatus;
+#endif
int8_t scanMode;
int8_t noTable;
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index 5c482e89b34a311400ff27468d888bdb25dba601..12fcd103e889115b5f909d2918efe08ce03042be 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -335,8 +335,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
- } else {
- // switch to log
}
} else {
@@ -352,6 +350,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
return 0;
}
+#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
@@ -371,3 +370,4 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
+#endif
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 8171c7e42ee1522da6e64fb2f21382f8ee0f20ec..cd6b2a83d1ef9bed89202520afdff35eceab1c52 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -2467,7 +2467,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup);
-
+
taosMemoryFreeClear(param);
}
@@ -2848,7 +2848,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
}
-
+#if 0
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
uint8_t type = pOperator->operatorType;
@@ -2933,6 +2933,7 @@ int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
return TSDB_CODE_SUCCESS;
}
+#endif
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
@@ -3344,8 +3345,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
- int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey
- : pInfo->existNewGroupBlock->info.window.ekey;
+ int64_t ekey =
+ Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
@@ -3681,14 +3682,14 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
cleanupBasicInfo(pInfo);
-
+
taosMemoryFreeClear(param);
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
- cleanupBasicInfo(&pInfo->binfo);
-
+ cleanupBasicInfo(&pInfo->binfo);
+
taosMemoryFreeClear(param);
}
@@ -3697,7 +3698,7 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(pInfo->p);
-
+
taosMemoryFreeClear(param);
}
@@ -3709,7 +3710,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo);
-
+
taosMemoryFreeClear(param);
}
@@ -3727,7 +3728,7 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
-
+
taosMemoryFreeClear(param);
}
@@ -3746,7 +3747,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
}
tsem_destroy(&pExInfo->ready);
-
+
taosMemoryFreeClear(param);
}
@@ -3975,7 +3976,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->win = win;
- pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
+ pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
taosMemoryFree(pInfo->pFillInfo);
taosMemoryFree(pInfo->p);
@@ -4476,7 +4477,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
- pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pPhyNode->pConditions, pTaskInfo);
+ pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
+ pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
@@ -4515,8 +4517,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
- pOptr =
- createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo);
+ pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
+ pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
@@ -4538,7 +4540,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
- pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo);
+ pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
+ pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
@@ -4797,7 +4800,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserterParam->readHandle = readHandle;
-
+
*pParam = pInserterParam;
break;
}
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 3d36ab16de5e768539dcf9660d10fa580bac9d7e..7410205d43b9b73f54271e8a79f11701f5af14b5 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -422,8 +422,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
// todo refactor
- pTableScanInfo->lastStatus.uid = pBlock->info.uid;
- pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;
+ /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
+ /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
+ pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
+ pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
+ pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.uid != 0);
return pBlock;
@@ -1191,9 +1194,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
- if (pBlockInfo->rows > 0) {
- return 0;
- }
return 0;
}
@@ -1219,7 +1219,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
- /*} else {*/
+ } else {
+ // data is filtered out, do clean
+
/*tDeleteSSDataBlock(&ret.data);*/
}
} else if (ret.fetchType == FETCH_TYPE__META) {
@@ -1228,13 +1230,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
- /*if (ret.offset.version == -1) {*/
- /*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
- /*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
- /*} else {*/
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
- /*}*/
return NULL;
} else {
ASSERT(0);
@@ -1354,72 +1351,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL;
}
- pInfo->pRes->info.rows = block.info.rows;
- pInfo->pRes->info.uid = block.info.uid;
- pInfo->pRes->info.type = STREAM_NORMAL;
- pInfo->pRes->info.capacity = block.info.rows;
-
- uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
- if (groupIdPre) {
- pInfo->pRes->info.groupId = *groupIdPre;
- } else {
- pInfo->pRes->info.groupId = 0;
- }
-
- // for generating rollup SMA result, each time is an independent time serie.
- // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
- if (pInfo->assignBlockUid) {
- pInfo->pRes->info.groupId = block.info.uid;
- }
-
- // todo extract method
- for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
- SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
- if (!pColMatchInfo->output) {
- continue;
- }
-
- bool colExists = false;
- for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
- SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
- if (pResCol->info.colId == pColMatchInfo->colId) {
- taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
- colExists = true;
- break;
- }
- }
-
- // the required column does not exists in submit block, let's set it to be all null value
- if (!colExists) {
- SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
- colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
- }
- }
-
- taosArrayDestroy(block.pDataBlock);
-
- ASSERT(pInfo->pRes->pDataBlock != NULL);
-#if 0
- if (pInfo->pRes->pDataBlock == NULL) {
- // TODO add log
- updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
- pOperator->status = OP_EXEC_DONE;
- pTaskInfo->code = terrno;
- return NULL;
- }
-#endif
-
- // currently only the tbname pseudo column
- if (pInfo->numOfPseudoExpr > 0) {
- code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
- GET_TASKID(pTaskInfo));
- if (code != TSDB_CODE_SUCCESS) {
- longjmp(pTaskInfo->env, code);
- }
- }
+ setBlockIntoRes(pInfo, &block);
- doFilter(pInfo->pCondition, pInfo->pRes);
- blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
break;
}
@@ -1449,12 +1382,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
+#if 0
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
- /*ASSERT(0);*/
+ ASSERT(0);
// check reader last status
// if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
+#endif
} else {
ASSERT(0);
@@ -1518,6 +1453,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
+ if (pHandle->version > 0) {
+ pSTInfo->cond.endVersion = pHandle->version;
+ }
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->initTableReader) {
diff --git a/source/libs/index/inc/indexFstCommon.h b/source/libs/index/inc/indexFstCommon.h
index 8335e437fb148b5f83f3b0db8fc2c6f63a4caa72..e15df5dc340ff4769e87f7329f7b4ec82d0ff4df 100644
--- a/source/libs/index/inc/indexFstCommon.h
+++ b/source/libs/index/inc/indexFstCommon.h
@@ -4,6 +4,7 @@
#include "tutil.h"
extern const uint8_t COMMON_INPUTS[];
extern const char COMMON_INPUTS_INV[];
+extern const int32_t COMMON_INPUTS_LEN;
#ifdef __cplusplus
extern "C" {
diff --git a/source/libs/index/inc/indexFstDfa.h b/source/libs/index/inc/indexFstDfa.h
index f6c220bcb7a6468d7be58249b54c62dcc2402306..9ca10897fd96cc1f83c12ae7a7114251599cf34f 100644
--- a/source/libs/index/inc/indexFstDfa.h
+++ b/source/libs/index/inc/indexFstDfa.h
@@ -29,16 +29,16 @@ extern "C" {
typedef struct FstDfa FstDfa;
typedef struct {
- SArray * insts;
+ SArray *insts;
uint32_t next[256];
bool isMatch;
-} State;
+} DfaState;
/*
* dfa builder related func
**/
typedef struct FstDfaBuilder {
- FstDfa * dfa;
+ FstDfa *dfa;
SHashObj *cache;
} FstDfaBuilder;
diff --git a/source/libs/index/inc/indexFstRegex.h b/source/libs/index/inc/indexFstRegex.h
index 2bf9c9b791397d634cc0704d5f556448d796c6aa..2814b5dc16bedf822a98be003230431ecb665cda 100644
--- a/source/libs/index/inc/indexFstRegex.h
+++ b/source/libs/index/inc/indexFstRegex.h
@@ -65,6 +65,7 @@ typedef struct {
} FstRegex;
FstRegex *regexCreate(const char *str);
+void regexDestroy(FstRegex *regex);
uint32_t regexAutomStart(FstRegex *regex);
bool regexAutomIsMatch(FstRegex *regex, uint32_t state);
diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c
index 040e8ed83029dba1d2f259509d800bfc740bd64b..05ce418037baeea854adfc57d2ad4b96fb6ac22d 100644
--- a/source/libs/index/src/indexCache.c
+++ b/source/libs/index/src/indexCache.c
@@ -22,7 +22,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000
-#define MEM_THRESHOLD 64 * 1024
+#define MEM_THRESHOLD 512 * 1024
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20
#define MEM_ESTIMATE_RADIO 1.5
@@ -204,7 +204,6 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
- // taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
@@ -309,7 +308,6 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
if (cond == MATCH) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
- // taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c
index 81ac4c9d40bb13cf06446e03375f91dd0c495af7..c4b83f8a073464c232d916ea30b2a2859ccb1e22 100644
--- a/source/libs/index/src/indexFst.c
+++ b/source/libs/index/src/indexFst.c
@@ -1307,7 +1307,6 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) {
taosArrayPush(sws->inp, &(trn.inp));
if (FST_NODE_IS_FINAL(nextNode)) {
- // void *eofState = sws->aut->acceptEof(nextState);
void* eofState = automFuncs[aut->type].acceptEof(aut, nextState);
if (eofState != NULL) {
isMatch = automFuncs[aut->type].isMatch(aut, eofState);
diff --git a/source/libs/index/src/indexFstCommon.c b/source/libs/index/src/indexFstCommon.c
index 902e68ce091c0a7316f5404515a742a911fb603e..0b201570090878b6227ebfa2b3c552049a895aaf 100644
--- a/source/libs/index/src/indexFstCommon.c
+++ b/source/libs/index/src/indexFstCommon.c
@@ -294,3 +294,4 @@ const char COMMON_INPUTS_INV[] = {
'\xee', '\xef', '\xf0', '\xf1', '\xf2', '\xf3', '\xf4', '\xf5', '\xf6', '\xf7', '\xf8', '\xf9', '\xfa', '\xfb',
'\xfc', '\xfd', '\xfe', '\xff',
};
+const int32_t COMMON_INPUTS_LEN = sizeof(COMMON_INPUTS) / sizeof(COMMON_INPUTS[0]);
diff --git a/source/libs/index/src/indexFstDfa.c b/source/libs/index/src/indexFstDfa.c
index 3011f124c912c75b59eaade7a90239fc2adbe0b9..b820f16a2a00f3ca13453bcc61034e840b79ccb2 100644
--- a/source/libs/index/src/indexFstDfa.c
+++ b/source/libs/index/src/indexFstDfa.c
@@ -41,7 +41,7 @@ FstDfaBuilder *dfaBuilderCreate(SArray *insts) {
return NULL;
}
- SArray *states = taosArrayInit(4, sizeof(State));
+ SArray *states = taosArrayInit(4, sizeof(DfaState));
builder->dfa = dfaCreate(insts, states);
builder->cache = taosHashInit(
@@ -71,7 +71,7 @@ FstDfa *dfaBuilder(FstDfaBuilder *builder) {
dfaAdd(builder->dfa, cur, 0);
- SArray * states = taosArrayInit(0, sizeof(uint32_t));
+ SArray *states = taosArrayInit(0, sizeof(uint32_t));
uint32_t result;
if (dfaBuilderCachedState(builder, cur, &result)) {
taosArrayPush(states, &result);
@@ -98,10 +98,12 @@ FstDfa *dfaBuilder(FstDfaBuilder *builder) {
return builder->dfa;
}
+FstDfa *dfaBuilderBuild(FstDfaBuilder *builer) { return NULL; }
+
bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet *next, uint32_t state, uint8_t byte,
uint32_t *result) {
sparSetClear(cur);
- State *t = taosArrayGet(builder->dfa->states, state);
+ DfaState *t = taosArrayGet(builder->dfa->states, state);
for (int i = 0; i < taosArrayGetSize(t->insts); i++) {
uint32_t ip = *(int32_t *)taosArrayGet(t->insts, i);
sparSetAdd(cur, ip);
@@ -144,7 +146,7 @@ bool dfaBuilderCachedState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *
*result = *v;
taosArrayDestroy(tinsts);
} else {
- State st;
+ DfaState st;
st.insts = tinsts;
st.isMatch = isMatch;
taosArrayPush(builder->dfa->states, &st);
@@ -169,14 +171,14 @@ bool dfaIsMatch(FstDfa *dfa, uint32_t si) {
if (dfa->states == NULL || si < taosArrayGetSize(dfa->states)) {
return false;
}
- State *st = taosArrayGet(dfa->states, si);
+ DfaState *st = taosArrayGet(dfa->states, si);
return st != NULL ? st->isMatch : false;
}
bool dfaAccept(FstDfa *dfa, uint32_t si, uint8_t byte, uint32_t *result) {
if (dfa->states == NULL || si < taosArrayGetSize(dfa->states)) {
return false;
}
- State *st = taosArrayGet(dfa->states, si);
+ DfaState *st = taosArrayGet(dfa->states, si);
*result = st->next[byte];
return true;
}
diff --git a/source/libs/index/src/indexFstRegex.c b/source/libs/index/src/indexFstRegex.c
index 33eeae802e536a74c59748816ce09f1140dab0ff..37cb58996f2f5910a6dd520e9982f4a195553b4b 100644
--- a/source/libs/index/src/indexFstRegex.c
+++ b/source/libs/index/src/indexFstRegex.c
@@ -23,7 +23,7 @@ FstRegex *regexCreate(const char *str) {
return NULL;
}
int32_t sz = (int32_t)strlen(str);
- char * orig = taosMemoryCalloc(1, sz);
+ char *orig = taosMemoryCalloc(1, sz);
memcpy(orig, str, sz);
regex->orig = orig;
@@ -36,6 +36,12 @@ FstRegex *regexCreate(const char *str) {
return regex;
}
+void regexDestroy(FstRegex *regex) {
+ if (regex == NULL) return;
+ taosMemoryFree(regex->orig);
+ taosMemoryFree(regex);
+}
+
uint32_t regexAutomStart(FstRegex *regex) {
///// no nothing
return 0;
diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt
index 9a4b7bbad8d34fd99e5160e3414dc7c77421633f..b3eca280032e56004e649b2d2cef44ec1672d8ac 100644
--- a/source/libs/index/test/CMakeLists.txt
+++ b/source/libs/index/test/CMakeLists.txt
@@ -4,6 +4,7 @@ IF(NOT TD_DARWIN)
add_executable(idxFstUT "")
add_executable(idxUtilUT "")
add_executable(idxJsonUT "")
+ add_executable(idxFstUtilUT "")
target_sources(idxTest
PRIVATE
@@ -23,6 +24,25 @@ IF(NOT TD_DARWIN)
"utilUT.cc"
)
+ target_sources(idxJsonUT
+ PRIVATE
+ "jsonUT.cc"
+ )
+ target_sources(idxFstUtilUT
+ PRIVATE
+ "fstUtilUT.cc"
+ )
+
+ target_include_directories (idxTest
+ PUBLIC
+ "${TD_SOURCE_DIR}/include/libs/index"
+ "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
+ )
+ target_include_directories (idxFstTest
+ PUBLIC
+ "${TD_SOURCE_DIR}/include/libs/index"
+ "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
+ )
target_sources(idxJsonUT
PRIVATE
"jsonUT.cc"
@@ -50,6 +70,38 @@ IF(NOT TD_DARWIN)
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
+ target_include_directories (idxJsonUT
+ PUBLIC
+ "${TD_SOURCE_DIR}/include/libs/index"
+ "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
+ )
+ target_include_directories (idxFstUtilUT
+ PUBLIC
+ "${TD_SOURCE_DIR}/include/libs/index"
+ "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
+ )
+
+ target_link_libraries (idxTest
+ os
+ util
+ common
+ gtest_main
+ index
+ )
+ target_link_libraries (idxFstTest
+ os
+ util
+ common
+ gtest_main
+ index
+ )
+ target_link_libraries (idxFstUT
+ os
+ util
+ common
+ gtest_main
+ index
+ )
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
@@ -92,15 +144,28 @@ IF(NOT TD_DARWIN)
gtest_main
index
)
-
- add_test(
- NAME idxtest
- COMMAND idxTest
+ target_link_libraries (idxFstUtilUT
+ os
+ util
+ common
+ gtest_main
+ index
)
+
add_test(
NAME idxJsonUT
COMMAND idxJsonUT
)
+ add_test(
+ NAME idxFstUtilUT
+ COMMAND idxFstUtilUT
+
+ )
+
+ add_test(
+ NAME idxtest
+ COMMAND idxTest
+ )
add_test(
NAME idxUtilUT
COMMAND idxUtilUT
@@ -109,4 +174,4 @@ IF(NOT TD_DARWIN)
NAME idxFstUT
COMMAND idxFstUT
)
-ENDIF ()
\ No newline at end of file
+ENDIF ()
diff --git a/source/libs/index/test/fstUtilUT.cc b/source/libs/index/test/fstUtilUT.cc
new file mode 100644
index 0000000000000000000000000000000000000000..2c29758756704bafc0e650b04771af17bf57199e
--- /dev/null
+++ b/source/libs/index/test/fstUtilUT.cc
@@ -0,0 +1,70 @@
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include "index.h"
+#include "indexCache.h"
+#include "indexFst.h"
+#include "indexFstDfa.h"
+#include "indexFstRegex.h"
+#include "indexFstSparse.h"
+#include "indexFstUtil.h"
+#include "indexInt.h"
+#include "indexTfile.h"
+#include "tglobal.h"
+#include "tlog.h"
+#include "tskiplist.h"
+#include "tutil.h"
+class FstUtilEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ SArray *inst = taosArrayInit(4, sizeof(char));
+ builder = dfaBuilderCreate(inst);
+ }
+ virtual void TearDown() { dfaBuilderDestroy(builder); }
+
+ FstDfaBuilder *builder;
+};
+
+class FstRegexEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() { regex = regexCreate("test"); }
+ virtual void TearDown() { regexDestroy(regex); }
+ FstRegex *regex;
+};
+
+class FstSparseSetEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() { set = sparSetCreate(256); }
+ virtual void TearDown() {
+ // tear down
+ sparSetDestroy(set);
+ }
+ void ReBuild(int32_t sz) {
+ sparSetDestroy(set);
+ set = sparSetCreate(sz);
+ }
+ FstSparseSet *set;
+};
+
+// test FstDfaBuilder
+TEST_F(FstUtilEnv, test1) {}
+TEST_F(FstUtilEnv, test2) {}
+TEST_F(FstUtilEnv, test3) {}
+TEST_F(FstUtilEnv, test4) {}
+
+// test FstRegex
+
+TEST_F(FstRegexEnv, test1) {}
+TEST_F(FstRegexEnv, test2) {}
+TEST_F(FstRegexEnv, test3) {}
+TEST_F(FstRegexEnv, test4) {}
+
+// test FstSparseSet
+TEST_F(FstSparseSetEnv, test1) {}
+TEST_F(FstSparseSetEnv, test2) {}
+TEST_F(FstSparseSetEnv, test3) {}
+TEST_F(FstSparseSetEnv, test4) {}
diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc
index ab5128cd3eaf6c35980c4d8dcc32f1c792b3559b..323a6b4afae23ba08bec6dac080223c4c184ddad 100644
--- a/source/libs/index/test/utilUT.cc
+++ b/source/libs/index/test/utilUT.cc
@@ -8,6 +8,7 @@
#include "indexCache.h"
#include "indexComm.h"
#include "indexFst.h"
+#include "indexFstCommon.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
@@ -356,3 +357,11 @@ TEST_F(UtilEnv, TempResultExcept) {
idxTRsltMergeTo(relt, f);
EXPECT_EQ(taosArrayGetSize(f), 1);
}
+
+TEST_F(UtilEnv, testDictComm) {
+ int32_t count = COMMON_INPUTS_LEN;
+ for (int i = 0; i < 256; i++) {
+ uint8_t v = COMMON_INPUTS_INV[i];
+ EXPECT_EQ(COMMON_INPUTS[v], i);
+ }
+}
diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c
index cf8116fee74a49b49198969d268a5dac62624889..757279269af8ed2e32221918ea42165eb19e9588 100644
--- a/source/libs/qworker/src/qworker.c
+++ b/source/libs/qworker/src/qworker.c
@@ -517,7 +517,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->queryType = qwMsg->msgType;
- QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
+ //QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c
index 2cbe1e5c96e04fe4353468ed1411ab07fb2ea00b..408249404e834a387d52ca4e6601fa5d9fc4ece8 100644
--- a/source/libs/scalar/src/scalar.c
+++ b/source/libs/scalar/src/scalar.c
@@ -578,7 +578,7 @@ _return:
SCL_RET(code);
}
-EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
+EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
if (opType <= OP_TYPE_CALC_MAX) {
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
@@ -610,6 +610,24 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT
return DEAL_RES_CONTINUE;
}
+EDealRes sclAggFuncWalker(SNode* pNode, void* pContext) {
+ if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
+ SFunctionNode* pFunc = (SFunctionNode*)pNode;
+ *(bool*)pContext = fmIsAggFunc(pFunc->funcId);
+ if (*(bool*)pContext) {
+ return DEAL_RES_END;
+ }
+ }
+
+ return DEAL_RES_CONTINUE;
+}
+
+
+bool sclContainsAggFuncNode(SNode* pNode) {
+ bool aggFunc = false;
+ nodesWalkExpr(pNode, sclAggFuncWalker, (void *)&aggFunc);
+ return aggFunc;
+}
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
SOperatorNode *node = (SOperatorNode *)*pNode;
@@ -617,8 +635,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
SValueNode *valueNode = (SValueNode *)node->pLeft;
- if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
- return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
+ if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
+ && (!sclContainsAggFuncNode(node->pRight))) {
+ return sclRewriteNullInOptr(pNode, ctx, node->opType);
}
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight)
@@ -633,8 +652,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
SValueNode *valueNode = (SValueNode *)node->pRight;
- if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
- return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
+ if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
+ && (!sclContainsAggFuncNode(node->pLeft))) {
+ return sclRewriteNullInOptr(pNode, ctx, node->opType);
}
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft)
@@ -656,7 +676,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
ERASE_NODE(listNode->pNodeList);
continue;
} else { //OP_TYPE_NOT_IN
- return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
+ return sclRewriteNullInOptr(pNode, ctx, node->opType);
}
}
@@ -664,7 +684,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
}
if (listNode->pNodeList->length <= 0) {
- return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
+ return sclRewriteNullInOptr(pNode, ctx, node->opType);
}
}
diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh
index 409fb5e930c5650b6cdb38bb09a9f0839398f486..9ed34daf69808903296cbc19cc023e68297668d8 100755
--- a/tests/system-test/fulltest.sh
+++ b/tests/system-test/fulltest.sh
@@ -120,7 +120,7 @@ python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/and_or_for_byte.py
python3 ./test.py -f 2-query/function_null.py
-python3 ./test.py -f 2-query/queryQnode.py
+#python3 ./test.py -f 2-query/queryQnode.py
#python3 ./test.py -f 6-cluster/5dnode1mnode.py
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3