提交 ac8e9d10 编写于 作者: H Haojun Liao

other: merge 3.0

...@@ -36,6 +36,7 @@ typedef struct SReadHandle { ...@@ -36,6 +36,7 @@ typedef struct SReadHandle {
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
int64_t version;
bool initMetaReader; bool initMetaReader;
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
......
...@@ -264,15 +264,12 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { ...@@ -264,15 +264,12 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0; static int32_t emptyRspNum = 0;
if (code != 0) {
taosMemoryFreeClear(param);
return -1;
}
char *key = (char *)param; char *key = (char *)param;
SClientHbBatchRsp pRsp = {0}; SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); if (TSDB_CODE_SUCCESS == code) {
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
}
int32_t rspNum = taosArrayGetSize(pRsp.rsps); int32_t rspNum = taosArrayGetSize(pRsp.rsps);
taosThreadMutexLock(&appInfo.mutex); taosThreadMutexLock(&appInfo.mutex);
...@@ -288,6 +285,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -288,6 +285,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
if (code != 0) {
(*pInst)->onlineDnodes = 0;
}
if (rspNum) { if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cJSON.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "parser.h" #include "parser.h"
...@@ -23,7 +24,6 @@ ...@@ -23,7 +24,6 @@
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
#include "cJSON.h"
int32_t tmqAskEp(tmq_t* tmq, bool async); int32_t tmqAskEp(tmq_t* tmq, bool async);
...@@ -106,8 +106,8 @@ struct tmq_t { ...@@ -106,8 +106,8 @@ struct tmq_t {
tsem_t rspSem; tsem_t rspSem;
}; };
struct tmq_raw_data{ struct tmq_raw_data {
void *raw_meta; void* raw_meta;
int32_t raw_meta_len; int32_t raw_meta_len;
int16_t raw_meta_type; int16_t raw_meta_type;
}; };
...@@ -953,6 +953,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -953,6 +953,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto FAIL; goto FAIL;
} }
tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
return pTmq; return pTmq;
FAIL: FAIL:
...@@ -1194,10 +1196,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -1194,10 +1196,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
for (int32_t j = 0; j < vgNumCur; j++) { for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[50]; char buf[80];
tFormatOffset(buf, 50, &pVgCur->currentOffsetNew); tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
vgKey, buf); pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
} }
} }
...@@ -1564,7 +1566,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1564,7 +1566,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt);
continue; continue;
/*if (vgSkipCnt < 10000) continue;*/ /*if (vgSkipCnt < 10000) continue;*/
#if 0 #if 0
...@@ -1620,8 +1623,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1620,8 +1623,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
char offsetFormatBuf[80]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew); tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, tmq->consumerId, tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
...@@ -1669,7 +1672,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1669,7 +1672,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset; pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) { if (pollRspWrapper->dataRsp.blockNum == 0) {
...@@ -1691,7 +1695,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1691,7 +1695,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG; pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
...@@ -1848,9 +1853,9 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1848,9 +1853,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) { tmq_raw_data* tmq_get_raw_meta(TAOS_RES* res) {
if (TD_RES_TMQ_META(res)) { if (TD_RES_TMQ_META(res)) {
tmq_raw_data *raw = taosMemoryCalloc(1, sizeof(tmq_raw_data)); tmq_raw_data* raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
raw->raw_meta = pMetaRspObj->metaRsp.metaRsp; raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
...@@ -1860,7 +1865,8 @@ tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) { ...@@ -1860,7 +1865,8 @@ tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) {
return NULL; return NULL;
} }
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
char* string = NULL; char* string = NULL;
cJSON* json = cJSON_CreateObject(); cJSON* json = cJSON_CreateObject();
if (json == NULL) { if (json == NULL) {
...@@ -1870,31 +1876,31 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch ...@@ -1870,31 +1876,31 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
cJSON_AddItemToObject(json, "type", type); cJSON_AddItemToObject(json, "type", type);
char uid[32] = {0}; char uid[32] = {0};
sprintf(uid, "%"PRIi64, id); sprintf(uid, "%" PRIi64, id);
cJSON* id_ = cJSON_CreateString(uid); cJSON* id_ = cJSON_CreateString(uid);
cJSON_AddItemToObject(json, "id", id_); cJSON_AddItemToObject(json, "id", id_);
cJSON* tableName = cJSON_CreateString(name); cJSON* tableName = cJSON_CreateString(name);
cJSON_AddItemToObject(json, "tableName", tableName); cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
cJSON_AddItemToObject(json, "tableType", tableType); cJSON_AddItemToObject(json, "tableType", tableType);
// cJSON* version = cJSON_CreateNumber(1); // cJSON* version = cJSON_CreateNumber(1);
// cJSON_AddItemToObject(json, "version", version); // cJSON_AddItemToObject(json, "version", version);
cJSON* columns = cJSON_CreateArray(); cJSON* columns = cJSON_CreateArray();
for(int i = 0; i < schemaRow->nCols; i++){ for (int i = 0; i < schemaRow->nCols; i++) {
cJSON* column = cJSON_CreateObject(); cJSON* column = cJSON_CreateObject();
SSchema *s = schemaRow->pSchema + i; SSchema* s = schemaRow->pSchema + i;
cJSON* cname = cJSON_CreateString(s->name); cJSON* cname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(column, "name", cname); cJSON_AddItemToObject(column, "name", cname);
cJSON* ctype = cJSON_CreateNumber(s->type); cJSON* ctype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(column, "type", ctype); cJSON_AddItemToObject(column, "type", ctype);
if(s->type == TSDB_DATA_TYPE_BINARY){ if (s->type == TSDB_DATA_TYPE_BINARY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE; int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes); cJSON_AddItemToObject(column, "length", cbytes);
}else if (s->type == TSDB_DATA_TYPE_NCHAR){ } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes); cJSON_AddItemToObject(column, "length", cbytes);
} }
cJSON_AddItemToArray(columns, column); cJSON_AddItemToArray(columns, column);
...@@ -1902,20 +1908,20 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch ...@@ -1902,20 +1908,20 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
cJSON_AddItemToObject(json, "columns", columns); cJSON_AddItemToObject(json, "columns", columns);
cJSON* tags = cJSON_CreateArray(); cJSON* tags = cJSON_CreateArray();
for(int i = 0; schemaTag && i < schemaTag->nCols; i++){ for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
cJSON* tag = cJSON_CreateObject(); cJSON* tag = cJSON_CreateObject();
SSchema *s = schemaTag->pSchema + i; SSchema* s = schemaTag->pSchema + i;
cJSON* tname = cJSON_CreateString(s->name); cJSON* tname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(tag, "name", tname); cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(s->type); cJSON* ttype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(tag, "type", ttype); cJSON_AddItemToObject(tag, "type", ttype);
if(s->type == TSDB_DATA_TYPE_BINARY){ if (s->type == TSDB_DATA_TYPE_BINARY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE; int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(tag, "length", cbytes); cJSON_AddItemToObject(tag, "length", cbytes);
}else if (s->type == TSDB_DATA_TYPE_NCHAR){ } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(tag, "length", cbytes); cJSON_AddItemToObject(tag, "length", cbytes);
} }
cJSON_AddItemToArray(tags, tag); cJSON_AddItemToArray(tags, tag);
...@@ -1927,13 +1933,13 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch ...@@ -1927,13 +1933,13 @@ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* sch
return string; return string;
} }
static char *processCreateStb(SMqMetaRsp *metaRsp){ static char* processCreateStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
char* string = NULL; char* string = NULL;
// decode and process req // decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
...@@ -1949,7 +1955,7 @@ _err: ...@@ -1949,7 +1955,7 @@ _err:
return string; return string;
} }
static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){ static char* buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id) {
char* string = NULL; char* string = NULL;
cJSON* json = cJSON_CreateObject(); cJSON* json = cJSON_CreateObject();
if (json == NULL) { if (json == NULL) {
...@@ -1958,7 +1964,7 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t ...@@ -1958,7 +1964,7 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
cJSON* type = cJSON_CreateString("create"); cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type); cJSON_AddItemToObject(json, "type", type);
char cid[32] = {0}; char cid[32] = {0};
sprintf(cid, "%"PRIi64, id); sprintf(cid, "%" PRIi64, id);
cJSON* cid_ = cJSON_CreateString(cid); cJSON* cid_ = cJSON_CreateString(cid);
cJSON_AddItemToObject(json, "id", cid_); cJSON_AddItemToObject(json, "id", cid_);
...@@ -1968,19 +1974,19 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t ...@@ -1968,19 +1974,19 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
cJSON_AddItemToObject(json, "tableType", tableType); cJSON_AddItemToObject(json, "tableType", tableType);
char sid_[32] = {0}; char sid_[32] = {0};
sprintf(sid_, "%"PRIi64, sid); sprintf(sid_, "%" PRIi64, sid);
cJSON* using = cJSON_CreateString(sid_); cJSON* using = cJSON_CreateString(sid_);
cJSON_AddItemToObject(json, "using", using); cJSON_AddItemToObject(json, "using", using);
// cJSON* version = cJSON_CreateNumber(1); // cJSON* version = cJSON_CreateNumber(1);
// cJSON_AddItemToObject(json, "version", version); // cJSON_AddItemToObject(json, "version", version);
cJSON* tags = cJSON_CreateArray(); cJSON* tags = cJSON_CreateArray();
if (tTagIsJson(pTag)) { // todo if (tTagIsJson(pTag)) { // todo
char* pJson = parseTagDatatoJson(pTag); char* pJson = parseTagDatatoJson(pTag);
cJSON* tag = cJSON_CreateObject(); cJSON* tag = cJSON_CreateObject();
cJSON* tname = cJSON_CreateString("unknown"); // todo cJSON* tname = cJSON_CreateString("unknown"); // todo
cJSON_AddItemToObject(tag, "name", tname); cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
cJSON_AddItemToObject(tag, "type", ttype); cJSON_AddItemToObject(tag, "type", ttype);
...@@ -1999,12 +2005,12 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t ...@@ -1999,12 +2005,12 @@ static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t
goto end; goto end;
} }
for(int i = 0; i < taosArrayGetSize(pTagVals); i++){ for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
cJSON* tag = cJSON_CreateObject(); cJSON* tag = cJSON_CreateObject();
// cJSON* tname = cJSON_CreateNumber(pTagVal->cid); // cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
cJSON* tname = cJSON_CreateString("unkonwn"); // todo cJSON* tname = cJSON_CreateString("unkonwn"); // todo
cJSON_AddItemToObject(tag, "name", tname); cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(pTagVal->type); cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
cJSON_AddItemToObject(tag, "type", ttype); cJSON_AddItemToObject(tag, "type", ttype);
...@@ -2032,13 +2038,13 @@ end: ...@@ -2032,13 +2038,13 @@ end:
return string; return string;
} }
static char *processCreateTable(SMqMetaRsp *metaRsp){ static char* processCreateTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq; SVCreateTbReq* pCreateReq;
char *string = NULL; char* string = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len); tDecoderInit(&decoder, data, len);
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) { if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
...@@ -2048,27 +2054,29 @@ static char *processCreateTable(SMqMetaRsp *metaRsp){ ...@@ -2048,27 +2054,29 @@ static char *processCreateTable(SMqMetaRsp *metaRsp){
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE){ if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid); string =
}else if(pCreateReq->type == TSDB_NORMAL_TABLE){ buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string =
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
} }
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
_exit: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
static char *processAlterTable(SMqMetaRsp *metaRsp){ static char* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0}; SVAlterTbReq vAlterTbReq = {0};
char *string = NULL; char* string = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len); tDecoderInit(&decoder, data, len);
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) { if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
...@@ -2081,8 +2089,8 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ ...@@ -2081,8 +2089,8 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
} }
cJSON* type = cJSON_CreateString("alter"); cJSON* type = cJSON_CreateString("alter");
cJSON_AddItemToObject(json, "type", type); cJSON_AddItemToObject(json, "type", type);
// cJSON* uid = cJSON_CreateNumber(id); // cJSON* uid = cJSON_CreateNumber(id);
// cJSON_AddItemToObject(json, "uid", uid); // cJSON_AddItemToObject(json, "uid", uid);
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
cJSON_AddItemToObject(json, "tableName", tableName); cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal"); cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
...@@ -2097,43 +2105,43 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ ...@@ -2097,43 +2105,43 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType); cJSON_AddItemToObject(json, "colType", colType);
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){ if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes); cJSON_AddItemToObject(json, "colLength", cbytes);
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){ } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes); cJSON_AddItemToObject(json, "colLength", cbytes);
} }
break; break;
} }
case TSDB_ALTER_TABLE_DROP_COLUMN:{ case TSDB_ALTER_TABLE_DROP_COLUMN: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN); cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN);
cJSON_AddItemToObject(json, "alterType", alterType); cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName); cJSON_AddItemToObject(json, "colName", colName);
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{ case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES); cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES);
cJSON_AddItemToObject(json, "alterType", alterType); cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName); cJSON_AddItemToObject(json, "colName", colName);
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType); cJSON_AddItemToObject(json, "colType", colType);
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){ if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes); cJSON_AddItemToObject(json, "colLength", cbytes);
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){ } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes); cJSON_AddItemToObject(json, "colLength", cbytes);
} }
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{ case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME); cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME);
cJSON_AddItemToObject(json, "alterType", alterType); cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
...@@ -2142,12 +2150,12 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ ...@@ -2142,12 +2150,12 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
cJSON_AddItemToObject(json, "colNewName", colNewName); cJSON_AddItemToObject(json, "colNewName", colNewName);
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{ case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL); cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL);
cJSON_AddItemToObject(json, "alterType", alterType); cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName); cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
cJSON_AddItemToObject(json, "colName", tagName); cJSON_AddItemToObject(json, "colName", tagName);
cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo
cJSON_AddItemToObject(json, "colValue", colValue); cJSON_AddItemToObject(json, "colValue", colValue);
cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull); cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull);
cJSON_AddItemToObject(json, "colValueNull", isNull); cJSON_AddItemToObject(json, "colValueNull", isNull);
...@@ -2158,18 +2166,18 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){ ...@@ -2158,18 +2166,18 @@ static char *processAlterTable(SMqMetaRsp *metaRsp){
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
static char *processDropSTable(SMqMetaRsp *metaRsp){ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVDropStbReq req = {0}; SVDropStbReq req = {0};
char *string = NULL; char* string = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len); tDecoderInit(&decoder, data, len);
if (tDecodeSVDropStbReq(&decoder, &req) < 0) { if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
...@@ -2183,7 +2191,7 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){ ...@@ -2183,7 +2191,7 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){
cJSON* type = cJSON_CreateString("drop"); cJSON* type = cJSON_CreateString("drop");
cJSON_AddItemToObject(json, "type", type); cJSON_AddItemToObject(json, "type", type);
char uid[32] = {0}; char uid[32] = {0};
sprintf(uid, "%"PRIi64, req.suid); sprintf(uid, "%" PRIi64, req.suid);
cJSON* id = cJSON_CreateString(uid); cJSON* id = cJSON_CreateString(uid);
cJSON_AddItemToObject(json, "id", id); cJSON_AddItemToObject(json, "id", id);
cJSON* tableName = cJSON_CreateString(req.name); cJSON* tableName = cJSON_CreateString(req.name);
...@@ -2193,18 +2201,18 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){ ...@@ -2193,18 +2201,18 @@ static char *processDropSTable(SMqMetaRsp *metaRsp){
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
static char *processDropTable(SMqMetaRsp *metaRsp){ static char* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
char *string = NULL; char* string = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len); tDecoderInit(&decoder, data, len);
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) { if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
...@@ -2217,67 +2225,67 @@ static char *processDropTable(SMqMetaRsp *metaRsp){ ...@@ -2217,67 +2225,67 @@ static char *processDropTable(SMqMetaRsp *metaRsp){
} }
cJSON* type = cJSON_CreateString("drop"); cJSON* type = cJSON_CreateString("drop");
cJSON_AddItemToObject(json, "type", type); cJSON_AddItemToObject(json, "type", type);
// cJSON* uid = cJSON_CreateNumber(id); // cJSON* uid = cJSON_CreateNumber(id);
// cJSON_AddItemToObject(json, "uid", uid); // cJSON_AddItemToObject(json, "uid", uid);
// cJSON* tableType = cJSON_CreateString("normal"); // cJSON* tableType = cJSON_CreateString("normal");
// cJSON_AddItemToObject(json, "tableType", tableType); // cJSON_AddItemToObject(json, "tableType", tableType);
cJSON* tableNameList = cJSON_CreateArray(); cJSON* tableNameList = cJSON_CreateArray();
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq* pDropTbReq = req.pReqs + iReq; SVDropTbReq* pDropTbReq = req.pReqs + iReq;
cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
cJSON_AddItemToArray(tableNameList, tableName); cJSON_AddItemToArray(tableNameList, tableName);
} }
cJSON_AddItemToObject(json, "tableNameList", tableNameList); cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
char *tmq_get_json_meta(TAOS_RES *res){ char* tmq_get_json_meta(TAOS_RES* res) {
if (!TD_RES_TMQ_META(res)) { if (!TD_RES_TMQ_META(res)) {
return NULL; return NULL;
} }
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){ if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
return processCreateStb(&pMetaRspObj->metaRsp); return processCreateStb(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
return processCreateStb(&pMetaRspObj->metaRsp); return processCreateStb(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
return processDropSTable(&pMetaRspObj->metaRsp); return processDropSTable(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
return processCreateTable(&pMetaRspObj->metaRsp); return processCreateTable(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
return processAlterTable(&pMetaRspObj->metaRsp); return processAlterTable(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){ } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(&pMetaRspObj->metaRsp); return processDropTable(&pMetaRspObj->metaRsp);
} }
return NULL; return NULL;
} }
static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
SMCreateStbReq pReq = {0}; SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
if(!pRequest->pDb){ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
} }
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
...@@ -2286,16 +2294,16 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2286,16 +2294,16 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
} }
// build create stable // build create stable
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField)); pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
for(int32_t i = 0; i < req.schemaRow.nCols; i++){ for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
SSchema* pSchema = req.schemaRow.pSchema + i; SSchema* pSchema = req.schemaRow.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes}; SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name); strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pColumns, &field); taosArrayPush(pReq.pColumns, &field);
} }
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField)); pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
for(int32_t i = 0; i < req.schemaTag.nCols; i++){ for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
SSchema* pSchema = req.schemaTag.pSchema + i; SSchema* pSchema = req.schemaTag.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes}; SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name); strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pTags, &field); taosArrayPush(pReq.pTags, &field);
} }
...@@ -2323,7 +2331,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2323,7 +2331,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
} }
tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq); tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
SQuery pQuery = {0}; SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC; pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg; pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.msgType = pQuery.pCmdMsg->msgType;
...@@ -2333,18 +2341,18 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2333,18 +2341,18 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq); tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
} }
static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
SVDropStbReq req = {0}; SVDropStbReq req = {0};
SDecoder coder; SDecoder coder;
SMDropStbReq pReq = {0}; SMDropStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
...@@ -2352,12 +2360,12 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2352,12 +2360,12 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
goto end; goto end;
} }
if(!pRequest->pDb){ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
} }
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
if (tDecodeSVDropStbReq(&coder, &req) < 0) { if (tDecodeSVDropStbReq(&coder, &req) < 0) {
...@@ -2386,7 +2394,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2386,7 +2394,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
} }
tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq); tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
SQuery pQuery = {0}; SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC; pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg; pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.msgType = pQuery.pCmdMsg->msgType;
...@@ -2396,7 +2404,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2396,7 +2404,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
...@@ -2409,7 +2417,7 @@ typedef struct SVgroupCreateTableBatch { ...@@ -2409,7 +2417,7 @@ typedef struct SVgroupCreateTableBatch {
} SVgroupCreateTableBatch; } SVgroupCreateTableBatch;
static void destroyCreateTbReqBatch(void* data) { static void destroyCreateTbReqBatch(void* data) {
SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data; SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray); taosArrayDestroy(pTbBatch->req.pArray);
} }
...@@ -2426,12 +2434,12 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2426,12 +2434,12 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
goto end; goto end;
} }
if(!pRequest->pDb){ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
} }
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
...@@ -2456,15 +2464,15 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2456,15 +2464,15 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName; SName pName;
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2496,7 +2504,7 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2496,7 +2504,7 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_CREATE_TABLE; pQuery->msgType = TDMT_VND_CREATE_TABLE;
pQuery->stableQuery = false; pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT); pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray); code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2504,10 +2512,10 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2504,10 +2512,10 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
} }
launchQueryImpl(pRequest, pQuery, false, NULL); launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end pQuery = NULL; // no need to free in the end
code = pRequest->code; code = pRequest->code;
end: end:
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -2539,12 +2547,12 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2539,12 +2547,12 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
goto end; goto end;
} }
if(!pRequest->pDb){ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
} }
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) { if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
...@@ -2569,15 +2577,15 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2569,15 +2577,15 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq; pDropReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName; SName pName;
toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName); toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2607,7 +2615,7 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2607,7 +2615,7 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DROP_TABLE; pQuery->msgType = TDMT_VND_DROP_TABLE;
pQuery->stableQuery = false; pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT); pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray); code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2615,10 +2623,10 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2615,10 +2623,10 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
} }
launchQueryImpl(pRequest, pQuery, false, NULL); launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end pQuery = NULL; // no need to free in the end
code = pRequest->code; code = pRequest->code;
end: end:
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -2641,12 +2649,12 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2641,12 +2649,12 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
goto end; goto end;
} }
if(!pRequest->pDb){ if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
} }
// decode and process req // decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead); int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
if (tDecodeSVAlterTbReq(&coder, &req) < 0) { if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
...@@ -2655,7 +2663,7 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2655,7 +2663,7 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
} }
// do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
if(req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS){ if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
goto end; goto end;
} }
...@@ -2667,12 +2675,12 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2667,12 +2675,12 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
} }
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName = {0}; SName pName = {0};
toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName); toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2706,7 +2714,7 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2706,7 +2714,7 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_ALTER_TABLE; pQuery->msgType = TDMT_VND_ALTER_TABLE;
pQuery->stableQuery = false; pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT); pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pArray); code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2714,14 +2722,14 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ ...@@ -2714,14 +2722,14 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
} }
launchQueryImpl(pRequest, pQuery, false, NULL); launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end pQuery = NULL; // no need to free in the end
pVgData = NULL; pVgData = NULL;
pArray = NULL; pArray = NULL;
code = pRequest->code; code = pRequest->code;
end: end:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
if(pVgData) taosMemoryFreeClear(pVgData->pData); if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData); taosMemoryFreeClear(pVgData);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -2729,22 +2737,22 @@ end: ...@@ -2729,22 +2737,22 @@ end:
return code; return code;
} }
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){ int32_t taos_write_raw_meta(TAOS* taos, tmq_raw_data* raw_meta) {
if (!taos || !raw_meta) { if (!taos || !raw_meta) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) { if (raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){ } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){ } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_STB) {
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){ } else if (raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE) {
return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE){ } else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE) {
return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE){ } else if (raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE) {
return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
} }
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
......
...@@ -45,6 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); ...@@ -45,6 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq); static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp);
int32_t mndInitStb(SMnode *pMnode) { int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
...@@ -854,6 +855,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { ...@@ -854,6 +855,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
bool isAlter = false;
if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -889,6 +891,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { ...@@ -889,6 +891,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
code = 0; code = 0;
goto _OVER; goto _OVER;
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) { } else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
isAlter = true;
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
} else { } else {
mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name); mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
...@@ -929,7 +932,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { ...@@ -929,7 +932,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
code = mndCreateStb(pMnode, pReq, &createReq, pDb); if (isAlter) {
bool needRsp = false;
code = mndAlterStbImp(pMnode, pReq, pDb, pStb, needRsp);
} else {
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
}
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
...@@ -1495,14 +1503,13 @@ static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *t ...@@ -1495,14 +1503,13 @@ static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *t
return code; return code;
} }
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
int32_t *pLen) {
int32_t ret; int32_t ret;
SEncoder ec = {0}; SEncoder ec = {0};
uint32_t contLen = 0; uint32_t contLen = 0;
SMAlterStbRsp alterRsp = {0}; SMAlterStbRsp alterRsp = {0};
SName name = {0}; SName name = {0};
tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == alterRsp.pMeta) { if (NULL == alterRsp.pMeta) {
...@@ -1535,10 +1542,36 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S ...@@ -1535,10 +1542,36 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
return 0; return 0;
} }
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (needRsp) {
void *pCont = NULL;
int32_t contLen = 0;
if (mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen) != 0) goto _OVER;
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
bool needRsp = true; bool needRsp = true;
int32_t code = -1; int32_t code = -1;
STrans *pTrans = NULL;
SField *pField0 = NULL; SField *pField0 = NULL;
SStbObj stbObj = {0}; SStbObj stbObj = {0};
...@@ -1587,30 +1620,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p ...@@ -1587,30 +1620,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
} }
if (code != 0) goto _OVER; if (code != 0) goto _OVER;
code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp);
code = -1;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (needRsp) {
void *pCont = NULL;
int32_t contLen = 0;
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER: _OVER:
mndTransDrop(pTrans);
taosMemoryFreeClear(stbObj.pTags); taosMemoryFreeClear(stbObj.pTags);
taosMemoryFreeClear(stbObj.pColumns); taosMemoryFreeClear(stbObj.pColumns);
return code; return code;
......
...@@ -89,6 +89,8 @@ typedef struct { ...@@ -89,6 +89,8 @@ typedef struct {
STqExecTb execTb; STqExecTb execTb;
STqExecDb execDb; STqExecDb execDb;
}; };
// TODO remove it
int64_t tsdbEndVer;
} STqExecHandle; } STqExecHandle;
...@@ -129,7 +131,7 @@ typedef struct { ...@@ -129,7 +131,7 @@ typedef struct {
static STqMgmt tqMgmt = {0}; static STqMgmt tqMgmt = {0};
// tqRead // tqRead
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset); int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
......
...@@ -112,7 +112,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, ...@@ -112,7 +112,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
}; };
tmsgSendRsp(&resp); tmsgSendRsp(&resp);
tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64
", rspOffset:%" PRId64,
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);
return 0; return 0;
...@@ -179,8 +180,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -179,8 +180,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, offset.subKey, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts); offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
} else if (offset.val.type == TMQ_OFFSET__LOG) { } else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version); TD_VID(pTq->pVnode), offset.val.version);
...@@ -316,9 +317,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -316,9 +317,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
// 3.query // 3.query
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
fetchOffsetNew.version++; if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { fetchOffsetNew.version++;
}
if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0); ASSERT(0);
code = -1; code = -1;
goto OVER; goto OVER;
...@@ -333,7 +336,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -333,7 +336,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER; goto OVER;
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
...@@ -411,6 +414,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -411,6 +414,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
taosMemoryFree(pCkHead); taosMemoryFree(pCkHead);
#if 0
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts); tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
...@@ -421,6 +425,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -421,6 +425,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
#endif
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0); ASSERT(0);
} }
...@@ -478,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -478,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
/*for (int32_t i = 0; i < 5; i++) {*/ /*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ /*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/ /*}*/
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
...@@ -488,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -488,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTableReader = true, .initTableReader = true,
.initTqReader = true, .initTqReader = true,
.version = ver,
}; };
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
...@@ -496,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -496,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(scanner); ASSERT(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader[i]); ASSERT(pHandle->execHandle.pExecReader[i]);
pHandle->execHandle.tsdbEndVer = ver;
} }
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
......
...@@ -59,15 +59,17 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -59,15 +59,17 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0; return 0;
} }
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0]; qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--; pRsp->rspOffset.version--;
return 0; return 0;
} }
int32_t rowCnt = 0;
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
...@@ -77,11 +79,26 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff ...@@ -77,11 +79,26 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp); tqAddBlockDataToRsp(pDataBlock, pRsp);
pRsp->blockNum++;
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid; if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddTbNameToRsp(pTq, uid, pRsp); int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
} else {
pRsp->withTbName = 0;
}
} }
pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
rowCnt += pDataBlock->info.rows;
if (rowCnt <= 4096) continue;
}
}
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1);
qStreamPrepareScan(task, pOffset);
continue; continue;
} }
...@@ -94,18 +111,19 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff ...@@ -94,18 +111,19 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
ASSERT(0); ASSERT(0);
} }
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { ASSERT(pRsp->rspOffset.type != 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version); ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
} }
ASSERT(pRsp->rspOffset.type != 0);
break; break;
} }
return 0; return 0;
} }
#if 0
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
...@@ -153,6 +171,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -153,6 +171,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
return 0; return 0;
} }
#endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
......
...@@ -478,7 +478,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void ...@@ -478,7 +478,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapWriterClose(pWriter, isApply); int32_t code = vnodeSnapWriterClose(pWriter, !isApply);
return code; return code;
} }
......
...@@ -166,7 +166,7 @@ typedef struct SCtgDBCache { ...@@ -166,7 +166,7 @@ typedef struct SCtgDBCache {
int8_t deleted; int8_t deleted;
SCtgVgCache vgCache; SCtgVgCache vgCache;
SHashObj *tbCache; // key:tbname, value:SCtgTbCache SHashObj *tbCache; // key:tbname, value:SCtgTbCache
SHashObj *stbCache; // key:suid, value:STableMeta* SHashObj *stbCache; // key:suid, value:char*
} SCtgDBCache; } SCtgDBCache;
typedef struct SCtgRentSlot { typedef struct SCtgRentSlot {
......
...@@ -144,6 +144,7 @@ typedef struct { ...@@ -144,6 +144,7 @@ typedef struct {
void* metaBlk; // for tmq fetching meta void* metaBlk; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond; SWalFilterCond cond;
int64_t lastScanUid;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
...@@ -291,10 +292,12 @@ typedef struct STableScanInfo { ...@@ -291,10 +292,12 @@ typedef struct STableScanInfo {
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
#if 0
struct { struct {
uint64_t uid; uint64_t uid;
int64_t ts; int64_t ts;
} lastStatus; } lastStatus;
#endif
int8_t scanMode; int8_t scanMode;
int8_t noTable; int8_t noTable;
......
...@@ -335,8 +335,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { ...@@ -335,8 +335,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz); pTableScanInfo->currentTable, tableSz);
} else {
// switch to log
} }
} else { } else {
...@@ -352,6 +350,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { ...@@ -352,6 +350,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
return 0; return 0;
} }
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
...@@ -371,3 +370,4 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) { ...@@ -371,3 +370,4 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
return doGetScanStatus(pTaskInfo->pRoot, uid, ts); return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
} }
#endif
...@@ -2467,7 +2467,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2467,7 +2467,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->binfo.pRes); blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -2848,7 +2848,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -2848,7 +2848,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
} }
} }
} }
#if 0
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
uint8_t type = pOperator->operatorType; uint8_t type = pOperator->operatorType;
...@@ -2933,6 +2933,7 @@ int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { ...@@ -2933,6 +2933,7 @@ int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
// this is a blocking operator // this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
...@@ -3344,8 +3345,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult ...@@ -3344,8 +3345,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey int64_t ekey =
: pInfo->existNewGroupBlock->info.window.ekey; Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
...@@ -3681,14 +3682,14 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) { ...@@ -3681,14 +3682,14 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
cleanupBasicInfo(pInfo); cleanupBasicInfo(pInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3697,7 +3698,7 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3697,7 +3698,7 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(pInfo->p); taosMemoryFreeClear(pInfo->p);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3709,7 +3710,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3709,7 +3710,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3727,7 +3728,7 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3727,7 +3728,7 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3746,7 +3747,7 @@ void doDestroyExchangeOperatorInfo(void* param) { ...@@ -3746,7 +3747,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
} }
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -3975,7 +3976,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -3975,7 +3976,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id); pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->win = win; pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
taosMemoryFree(pInfo->pFillInfo); taosMemoryFree(pInfo->pFillInfo);
taosMemoryFree(pInfo->p); taosMemoryFree(pInfo->p);
...@@ -4476,7 +4477,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4476,7 +4477,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pPhyNode->pConditions, pTaskInfo); pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
...@@ -4515,8 +4517,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4515,8 +4517,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo); pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
...@@ -4538,7 +4540,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4538,7 +4540,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode); SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
...@@ -4797,7 +4800,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT ...@@ -4797,7 +4800,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pInserterParam->readHandle = readHandle; pInserterParam->readHandle = readHandle;
*pParam = pInserterParam; *pParam = pInserterParam;
break; break;
} }
......
...@@ -422,8 +422,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -422,8 +422,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
// todo refactor // todo refactor
pTableScanInfo->lastStatus.uid = pBlock->info.uid; /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey; /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.uid != 0); ASSERT(pBlock->info.uid != 0);
return pBlock; return pBlock;
...@@ -1191,9 +1194,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1191,9 +1194,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pCondition, pInfo->pRes); doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
return 0;
}
return 0; return 0;
} }
...@@ -1219,7 +1219,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1219,7 +1219,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/ /*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
/*} else {*/ } else {
// data is filtered out, do clean
/*tDeleteSSDataBlock(&ret.data);*/ /*tDeleteSSDataBlock(&ret.data);*/
} }
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
...@@ -1228,13 +1230,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1228,13 +1230,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaBlk = ret.meta; pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL; return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) { } else if (ret.fetchType == FETCH_TYPE__NONE) {
/*if (ret.offset.version == -1) {*/
/*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
/*} else {*/
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
/*}*/
return NULL; return NULL;
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -1354,72 +1351,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1354,72 +1351,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
pInfo->pRes->info.rows = block.info.rows; setBlockIntoRes(pInfo, &block);
pInfo->pRes->info.uid = block.info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = block.info.rows;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
pInfo->pRes->info.groupId = 0;
}
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = block.info.uid;
}
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
bool colExists = false;
for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
if (pResCol->info.colId == pColMatchInfo->colId) {
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
}
// the required column does not exists in submit block, let's set it to be all null value
if (!colExists) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
}
}
taosArrayDestroy(block.pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return NULL;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
break; break;
} }
...@@ -1449,12 +1382,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1449,12 +1382,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
#if 0
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) { } else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
/*ASSERT(0);*/ ASSERT(0);
// check reader last status // check reader last status
// if not match, reset status // if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL; return pResult && pResult->info.rows > 0 ? pResult : NULL;
#endif
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -1518,6 +1453,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1518,6 +1453,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle) { if (pHandle) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
if (pHandle->version > 0) {
pSTInfo->cond.endVersion = pHandle->version;
}
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "tutil.h" #include "tutil.h"
extern const uint8_t COMMON_INPUTS[]; extern const uint8_t COMMON_INPUTS[];
extern const char COMMON_INPUTS_INV[]; extern const char COMMON_INPUTS_INV[];
extern const int32_t COMMON_INPUTS_LEN;
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -29,16 +29,16 @@ extern "C" { ...@@ -29,16 +29,16 @@ extern "C" {
typedef struct FstDfa FstDfa; typedef struct FstDfa FstDfa;
typedef struct { typedef struct {
SArray * insts; SArray *insts;
uint32_t next[256]; uint32_t next[256];
bool isMatch; bool isMatch;
} State; } DfaState;
/* /*
* dfa builder related func * dfa builder related func
**/ **/
typedef struct FstDfaBuilder { typedef struct FstDfaBuilder {
FstDfa * dfa; FstDfa *dfa;
SHashObj *cache; SHashObj *cache;
} FstDfaBuilder; } FstDfaBuilder;
......
...@@ -65,6 +65,7 @@ typedef struct { ...@@ -65,6 +65,7 @@ typedef struct {
} FstRegex; } FstRegex;
FstRegex *regexCreate(const char *str); FstRegex *regexCreate(const char *str);
void regexDestroy(FstRegex *regex);
uint32_t regexAutomStart(FstRegex *regex); uint32_t regexAutomStart(FstRegex *regex);
bool regexAutomIsMatch(FstRegex *regex, uint32_t state); bool regexAutomIsMatch(FstRegex *regex, uint32_t state);
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000 #define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 64 * 1024 #define MEM_THRESHOLD 512 * 1024
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20 #define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20
#define MEM_ESTIMATE_RADIO 1.5 #define MEM_ESTIMATE_RADIO 1.5
...@@ -204,7 +204,6 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr ...@@ -204,7 +204,6 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
if (0 == strcmp(c->colVal, pCt->colVal)) { if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue; *s = kTypeValue;
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
...@@ -309,7 +308,6 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR ...@@ -309,7 +308,6 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
if (cond == MATCH) { if (cond == MATCH) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue; *s = kTypeValue;
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
......
...@@ -1307,7 +1307,6 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) { ...@@ -1307,7 +1307,6 @@ FStmStRslt* stmStNextWith(FStmSt* sws, StreamCallback callback) {
taosArrayPush(sws->inp, &(trn.inp)); taosArrayPush(sws->inp, &(trn.inp));
if (FST_NODE_IS_FINAL(nextNode)) { if (FST_NODE_IS_FINAL(nextNode)) {
// void *eofState = sws->aut->acceptEof(nextState);
void* eofState = automFuncs[aut->type].acceptEof(aut, nextState); void* eofState = automFuncs[aut->type].acceptEof(aut, nextState);
if (eofState != NULL) { if (eofState != NULL) {
isMatch = automFuncs[aut->type].isMatch(aut, eofState); isMatch = automFuncs[aut->type].isMatch(aut, eofState);
......
...@@ -294,3 +294,4 @@ const char COMMON_INPUTS_INV[] = { ...@@ -294,3 +294,4 @@ const char COMMON_INPUTS_INV[] = {
'\xee', '\xef', '\xf0', '\xf1', '\xf2', '\xf3', '\xf4', '\xf5', '\xf6', '\xf7', '\xf8', '\xf9', '\xfa', '\xfb', '\xee', '\xef', '\xf0', '\xf1', '\xf2', '\xf3', '\xf4', '\xf5', '\xf6', '\xf7', '\xf8', '\xf9', '\xfa', '\xfb',
'\xfc', '\xfd', '\xfe', '\xff', '\xfc', '\xfd', '\xfe', '\xff',
}; };
const int32_t COMMON_INPUTS_LEN = sizeof(COMMON_INPUTS) / sizeof(COMMON_INPUTS[0]);
...@@ -41,7 +41,7 @@ FstDfaBuilder *dfaBuilderCreate(SArray *insts) { ...@@ -41,7 +41,7 @@ FstDfaBuilder *dfaBuilderCreate(SArray *insts) {
return NULL; return NULL;
} }
SArray *states = taosArrayInit(4, sizeof(State)); SArray *states = taosArrayInit(4, sizeof(DfaState));
builder->dfa = dfaCreate(insts, states); builder->dfa = dfaCreate(insts, states);
builder->cache = taosHashInit( builder->cache = taosHashInit(
...@@ -71,7 +71,7 @@ FstDfa *dfaBuilder(FstDfaBuilder *builder) { ...@@ -71,7 +71,7 @@ FstDfa *dfaBuilder(FstDfaBuilder *builder) {
dfaAdd(builder->dfa, cur, 0); dfaAdd(builder->dfa, cur, 0);
SArray * states = taosArrayInit(0, sizeof(uint32_t)); SArray *states = taosArrayInit(0, sizeof(uint32_t));
uint32_t result; uint32_t result;
if (dfaBuilderCachedState(builder, cur, &result)) { if (dfaBuilderCachedState(builder, cur, &result)) {
taosArrayPush(states, &result); taosArrayPush(states, &result);
...@@ -98,10 +98,12 @@ FstDfa *dfaBuilder(FstDfaBuilder *builder) { ...@@ -98,10 +98,12 @@ FstDfa *dfaBuilder(FstDfaBuilder *builder) {
return builder->dfa; return builder->dfa;
} }
FstDfa *dfaBuilderBuild(FstDfaBuilder *builer) { return NULL; }
bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet *next, uint32_t state, uint8_t byte, bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet *next, uint32_t state, uint8_t byte,
uint32_t *result) { uint32_t *result) {
sparSetClear(cur); sparSetClear(cur);
State *t = taosArrayGet(builder->dfa->states, state); DfaState *t = taosArrayGet(builder->dfa->states, state);
for (int i = 0; i < taosArrayGetSize(t->insts); i++) { for (int i = 0; i < taosArrayGetSize(t->insts); i++) {
uint32_t ip = *(int32_t *)taosArrayGet(t->insts, i); uint32_t ip = *(int32_t *)taosArrayGet(t->insts, i);
sparSetAdd(cur, ip); sparSetAdd(cur, ip);
...@@ -144,7 +146,7 @@ bool dfaBuilderCachedState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t * ...@@ -144,7 +146,7 @@ bool dfaBuilderCachedState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *
*result = *v; *result = *v;
taosArrayDestroy(tinsts); taosArrayDestroy(tinsts);
} else { } else {
State st; DfaState st;
st.insts = tinsts; st.insts = tinsts;
st.isMatch = isMatch; st.isMatch = isMatch;
taosArrayPush(builder->dfa->states, &st); taosArrayPush(builder->dfa->states, &st);
...@@ -169,14 +171,14 @@ bool dfaIsMatch(FstDfa *dfa, uint32_t si) { ...@@ -169,14 +171,14 @@ bool dfaIsMatch(FstDfa *dfa, uint32_t si) {
if (dfa->states == NULL || si < taosArrayGetSize(dfa->states)) { if (dfa->states == NULL || si < taosArrayGetSize(dfa->states)) {
return false; return false;
} }
State *st = taosArrayGet(dfa->states, si); DfaState *st = taosArrayGet(dfa->states, si);
return st != NULL ? st->isMatch : false; return st != NULL ? st->isMatch : false;
} }
bool dfaAccept(FstDfa *dfa, uint32_t si, uint8_t byte, uint32_t *result) { bool dfaAccept(FstDfa *dfa, uint32_t si, uint8_t byte, uint32_t *result) {
if (dfa->states == NULL || si < taosArrayGetSize(dfa->states)) { if (dfa->states == NULL || si < taosArrayGetSize(dfa->states)) {
return false; return false;
} }
State *st = taosArrayGet(dfa->states, si); DfaState *st = taosArrayGet(dfa->states, si);
*result = st->next[byte]; *result = st->next[byte];
return true; return true;
} }
......
...@@ -23,7 +23,7 @@ FstRegex *regexCreate(const char *str) { ...@@ -23,7 +23,7 @@ FstRegex *regexCreate(const char *str) {
return NULL; return NULL;
} }
int32_t sz = (int32_t)strlen(str); int32_t sz = (int32_t)strlen(str);
char * orig = taosMemoryCalloc(1, sz); char *orig = taosMemoryCalloc(1, sz);
memcpy(orig, str, sz); memcpy(orig, str, sz);
regex->orig = orig; regex->orig = orig;
...@@ -36,6 +36,12 @@ FstRegex *regexCreate(const char *str) { ...@@ -36,6 +36,12 @@ FstRegex *regexCreate(const char *str) {
return regex; return regex;
} }
void regexDestroy(FstRegex *regex) {
if (regex == NULL) return;
taosMemoryFree(regex->orig);
taosMemoryFree(regex);
}
uint32_t regexAutomStart(FstRegex *regex) { uint32_t regexAutomStart(FstRegex *regex) {
///// no nothing ///// no nothing
return 0; return 0;
......
...@@ -4,6 +4,7 @@ IF(NOT TD_DARWIN) ...@@ -4,6 +4,7 @@ IF(NOT TD_DARWIN)
add_executable(idxFstUT "") add_executable(idxFstUT "")
add_executable(idxUtilUT "") add_executable(idxUtilUT "")
add_executable(idxJsonUT "") add_executable(idxJsonUT "")
add_executable(idxFstUtilUT "")
target_sources(idxTest target_sources(idxTest
PRIVATE PRIVATE
...@@ -23,6 +24,25 @@ IF(NOT TD_DARWIN) ...@@ -23,6 +24,25 @@ IF(NOT TD_DARWIN)
"utilUT.cc" "utilUT.cc"
) )
target_sources(idxJsonUT
PRIVATE
"jsonUT.cc"
)
target_sources(idxFstUtilUT
PRIVATE
"fstUtilUT.cc"
)
target_include_directories (idxTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxFstTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_sources(idxJsonUT target_sources(idxJsonUT
PRIVATE PRIVATE
"jsonUT.cc" "jsonUT.cc"
...@@ -50,6 +70,38 @@ IF(NOT TD_DARWIN) ...@@ -50,6 +70,38 @@ IF(NOT TD_DARWIN)
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxFstUtilUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (idxTest
os
util
common
gtest_main
index
)
target_link_libraries (idxFstTest
os
util
common
gtest_main
index
)
target_link_libraries (idxFstUT
os
util
common
gtest_main
index
)
target_include_directories (idxJsonUT target_include_directories (idxJsonUT
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/index" "${TD_SOURCE_DIR}/include/libs/index"
...@@ -92,15 +144,28 @@ IF(NOT TD_DARWIN) ...@@ -92,15 +144,28 @@ IF(NOT TD_DARWIN)
gtest_main gtest_main
index index
) )
target_link_libraries (idxFstUtilUT
add_test( os
NAME idxtest util
COMMAND idxTest common
gtest_main
index
) )
add_test( add_test(
NAME idxJsonUT NAME idxJsonUT
COMMAND idxJsonUT COMMAND idxJsonUT
) )
add_test(
NAME idxFstUtilUT
COMMAND idxFstUtilUT
)
add_test(
NAME idxtest
COMMAND idxTest
)
add_test( add_test(
NAME idxUtilUT NAME idxUtilUT
COMMAND idxUtilUT COMMAND idxUtilUT
...@@ -109,4 +174,4 @@ IF(NOT TD_DARWIN) ...@@ -109,4 +174,4 @@ IF(NOT TD_DARWIN)
NAME idxFstUT NAME idxFstUT
COMMAND idxFstUT COMMAND idxFstUT
) )
ENDIF () ENDIF ()
\ No newline at end of file
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstDfa.h"
#include "indexFstRegex.h"
#include "indexFstSparse.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
#include "tglobal.h"
#include "tlog.h"
#include "tskiplist.h"
#include "tutil.h"
class FstUtilEnv : public ::testing::Test {
protected:
virtual void SetUp() {
SArray *inst = taosArrayInit(4, sizeof(char));
builder = dfaBuilderCreate(inst);
}
virtual void TearDown() { dfaBuilderDestroy(builder); }
FstDfaBuilder *builder;
};
class FstRegexEnv : public ::testing::Test {
protected:
virtual void SetUp() { regex = regexCreate("test"); }
virtual void TearDown() { regexDestroy(regex); }
FstRegex *regex;
};
class FstSparseSetEnv : public ::testing::Test {
protected:
virtual void SetUp() { set = sparSetCreate(256); }
virtual void TearDown() {
// tear down
sparSetDestroy(set);
}
void ReBuild(int32_t sz) {
sparSetDestroy(set);
set = sparSetCreate(sz);
}
FstSparseSet *set;
};
// test FstDfaBuilder
TEST_F(FstUtilEnv, test1) {}
TEST_F(FstUtilEnv, test2) {}
TEST_F(FstUtilEnv, test3) {}
TEST_F(FstUtilEnv, test4) {}
// test FstRegex
TEST_F(FstRegexEnv, test1) {}
TEST_F(FstRegexEnv, test2) {}
TEST_F(FstRegexEnv, test3) {}
TEST_F(FstRegexEnv, test4) {}
// test FstSparseSet
TEST_F(FstSparseSetEnv, test1) {}
TEST_F(FstSparseSetEnv, test2) {}
TEST_F(FstSparseSetEnv, test3) {}
TEST_F(FstSparseSetEnv, test4) {}
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "indexCache.h" #include "indexCache.h"
#include "indexComm.h" #include "indexComm.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCommon.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
...@@ -356,3 +357,11 @@ TEST_F(UtilEnv, TempResultExcept) { ...@@ -356,3 +357,11 @@ TEST_F(UtilEnv, TempResultExcept) {
idxTRsltMergeTo(relt, f); idxTRsltMergeTo(relt, f);
EXPECT_EQ(taosArrayGetSize(f), 1); EXPECT_EQ(taosArrayGetSize(f), 1);
} }
TEST_F(UtilEnv, testDictComm) {
int32_t count = COMMON_INPUTS_LEN;
for (int i = 0; i < 256; i++) {
uint8_t v = COMMON_INPUTS_INV[i];
EXPECT_EQ(COMMON_INPUTS[v], i);
}
}
...@@ -517,7 +517,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ...@@ -517,7 +517,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->queryType = qwMsg->msgType; ctx->queryType = qwMsg->msgType;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); //QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
......
...@@ -578,7 +578,7 @@ _return: ...@@ -578,7 +578,7 @@ _return:
SCL_RET(code); SCL_RET(code);
} }
EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) { EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
if (opType <= OP_TYPE_CALC_MAX) { if (opType <= OP_TYPE_CALC_MAX) {
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) { if (NULL == res) {
...@@ -610,6 +610,24 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT ...@@ -610,6 +610,24 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
EDealRes sclAggFuncWalker(SNode* pNode, void* pContext) {
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
*(bool*)pContext = fmIsAggFunc(pFunc->funcId);
if (*(bool*)pContext) {
return DEAL_RES_END;
}
}
return DEAL_RES_CONTINUE;
}
bool sclContainsAggFuncNode(SNode* pNode) {
bool aggFunc = false;
nodesWalkExpr(pNode, sclAggFuncWalker, (void *)&aggFunc);
return aggFunc;
}
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
SOperatorNode *node = (SOperatorNode *)*pNode; SOperatorNode *node = (SOperatorNode *)*pNode;
...@@ -617,8 +635,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -617,8 +635,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) { if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
SValueNode *valueNode = (SValueNode *)node->pLeft; SValueNode *valueNode = (SValueNode *)node->pLeft;
if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) { if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
return sclRewriteBasedOnOptr(pNode, ctx, node->opType); && (!sclContainsAggFuncNode(node->pRight))) {
return sclRewriteNullInOptr(pNode, ctx, node->opType);
} }
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight)
...@@ -633,8 +652,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -633,8 +652,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) { if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
SValueNode *valueNode = (SValueNode *)node->pRight; SValueNode *valueNode = (SValueNode *)node->pRight;
if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) { if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
return sclRewriteBasedOnOptr(pNode, ctx, node->opType); && (!sclContainsAggFuncNode(node->pLeft))) {
return sclRewriteNullInOptr(pNode, ctx, node->opType);
} }
if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft)
...@@ -656,7 +676,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -656,7 +676,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
ERASE_NODE(listNode->pNodeList); ERASE_NODE(listNode->pNodeList);
continue; continue;
} else { //OP_TYPE_NOT_IN } else { //OP_TYPE_NOT_IN
return sclRewriteBasedOnOptr(pNode, ctx, node->opType); return sclRewriteNullInOptr(pNode, ctx, node->opType);
} }
} }
...@@ -664,7 +684,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -664,7 +684,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
} }
if (listNode->pNodeList->length <= 0) { if (listNode->pNodeList->length <= 0) {
return sclRewriteBasedOnOptr(pNode, ctx, node->opType); return sclRewriteNullInOptr(pNode, ctx, node->opType);
} }
} }
......
...@@ -120,7 +120,7 @@ python3 ./test.py -f 2-query/irate.py ...@@ -120,7 +120,7 @@ python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/and_or_for_byte.py python3 ./test.py -f 2-query/and_or_for_byte.py
python3 ./test.py -f 2-query/function_null.py python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py #python3 ./test.py -f 2-query/queryQnode.py
#python3 ./test.py -f 6-cluster/5dnode1mnode.py #python3 ./test.py -f 6-cluster/5dnode1mnode.py
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册