提交 e3c4215a 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3_liaohj

...@@ -26,6 +26,14 @@ static void msg_process(TAOS_RES* msg) { ...@@ -26,6 +26,14 @@ static void msg_process(TAOS_RES* msg) {
printf("topic: %s\n", tmq_get_topic_name(msg)); printf("topic: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg)); printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
void* meta;
int32_t metaLen;
tmq_get_raw_meta(msg, &meta, &metaLen);
printf("meta, len is %d\n", metaLen);
return;
}
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
...@@ -129,8 +137,8 @@ int32_t create_topic() { ...@@ -129,8 +137,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -191,7 +199,7 @@ tmq_t* build_consumer() { ...@@ -191,7 +199,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experiment.use.snapshot", "true"); tmq_conf_set(conf, "experiment.use.snapshot", "false");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
......
...@@ -261,7 +261,7 @@ enum tmq_res_t { ...@@ -261,7 +261,7 @@ enum tmq_res_t {
typedef enum tmq_res_t tmq_res_t; typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len); DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
......
...@@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo->current += 1; pResultInfo->current += 1;
return pResultInfo->row; return pResultInfo->row;
} }
} else if (TD_RES_TMQ_META(res)) {
return NULL;
} else { } else {
// assert to avoid un-initialization error // assert to avoid un-initialization error
ASSERT(0); ASSERT(0);
......
...@@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// handle meta rsp // handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
...@@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
pRspWrapper->vgHandle = pVg; pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic; pRspWrapper->topicHandle = pTopic;
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
if (rspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
} else { } else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId,
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset); pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* ...@@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
pRspObj->resType = RES_TYPE__TMQ; pRspObj->resType = RES_TYPE__TMQ_META;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->vgId = pWrapper->vgHandle->vgId;
...@@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) ...@@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return 0; return 0;
} }
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) { while (1) {
SMqRspWrapper* rspWrapper = NULL; SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&rspWrapper);
...@@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch); pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else { } else {
...@@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj; void* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
#if 0 #if 0
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
...@@ -1873,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1873,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) { int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) {
if (TD_RES_TMQ_META(res)) { if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
*raw_meta = pMetaRspObj->metaRsp.metaRsp; *raw_meta = pMetaRspObj->metaRsp.metaRsp;
......
...@@ -76,22 +76,28 @@ void deltaToUtcInitOnce() { ...@@ -76,22 +76,28 @@ void deltaToUtcInitOnce() {
static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim);
static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim);
static char* forwardToTimeStringEnd(char* str); static char* forwardToTimeStringEnd(char* str);
static bool checkTzPresent(const char* str, int32_t len); static bool checkTzPresent(const char* str, int32_t len);
static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {parseLocaltime,
parseLocaltimeDst}; parseLocaltimeDst};
int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, utime, timePrec, 'T'); if (checkTzPresent(timestr, len)) {
} else if (checkTzPresent(timestr, len)) { return parseTimeWithTz(timestr, utime, timePrec, 'T');
return parseTimeWithTz(timestr, utime, timePrec, 0); } else {
return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 'T');
}
} else { } else {
return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); if (checkTzPresent(timestr, len)) {
return parseTimeWithTz(timestr, utime, timePrec, 0);
} else {
return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 0);
}
} }
} }
...@@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) { ...@@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) {
return true; return true;
} }
int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); char *str;
if (delim == 'T') {
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
} else if (delim == 0) {
str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
} else {
str = NULL;
}
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1; //if parse failed, try "%Y-%m-%d" format
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1;
}
} }
#ifdef _MSC_VER #ifdef _MSC_VER
...@@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr ...@@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr
return 0; return 0;
} }
int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
tm.tm_isdst = -1; tm.tm_isdst = -1;
char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); char *str;
if (delim == 'T') {
str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
} else if (delim == 0) {
str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
} else {
str = NULL;
}
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1; //if parse failed, try "%Y-%m-%d" format
str = taosStrpTime(timestr, "%Y-%m-%d", &tm);
if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) {
return -1;
}
} }
/* mktime will be affected by TZ, set by using taos_options */ /* mktime will be affected by TZ, set by using taos_options */
......
...@@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
#if 1 #if 1
if (pReq->useSnapshot) { if (pReq->useSnapshot) {
tqInfo("retrieve using snapshot"); // TODO set ver into snapshot
int64_t lastVer = walGetCommittedVer(pTq->pWal); int64_t lastVer = walGetCommittedVer(pTq->pWal);
if (rsp.reqOffset < lastVer) { if (rsp.reqOffset < lastVer) {
tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer);
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId); tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
if (rsp.blockNum != 0) { if (rsp.blockNum != 0) {
rsp.withTbName = false; rsp.withTbName = false;
rsp.rspOffset = lastVer; rsp.rspOffset = lastVer;
tqInfo("direct send by snapshot rsp offset %ld", lastVer); tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset);
fetchOffset = lastVer;
goto SEND_RSP; goto SEND_RSP;
} }
} }
...@@ -304,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -304,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE); pHead->msgType == TDMT_VND_DROP_TTL_TABLE);
// return tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->currentOffset; metaRsp.reqOffset = pReq->currentOffset;
metaRsp.rspOffset = fetchOffset; metaRsp.rspOffset = fetchOffset;
...@@ -387,6 +389,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -387,6 +389,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->epoch = -1; pHandle->epoch = -1;
pHandle->execHandle.subType = req.subType; pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
......
...@@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo { ...@@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo {
SReadHandle readHandle; SReadHandle readHandle;
uint64_t tableUid; // queried super table uid uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode; EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy; SOperatorInfo* pSnapshotReadOp;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
......
...@@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_ ...@@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); colDataAppend(pColInfoData, i, data,
(data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
} }
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
...@@ -774,7 +775,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -774,7 +775,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
if (!needRead) { if (!needRead) {
return false; return false;
} }
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
...@@ -819,11 +820,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_ ...@@ -819,11 +820,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pOperatorDumy); pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult == NULL) { if (pResult == NULL) {
if (prepareDataScan(pInfo)) { if (prepareDataScan(pInfo)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pOperatorDumy); pResult = doTableScan(pInfo->pSnapshotReadOp);
} }
} }
if (!pResult) { if (!pResult) {
...@@ -857,11 +858,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa ...@@ -857,11 +858,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
blockDataEnsureCapacity(pUpdateBlock, size); blockDataEnsureCapacity(pUpdateBlock, size);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
int32_t i = 0; int32_t i = 0;
for (; i < size; i++) { for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
if (pInfo->groupId != id) { if (pInfo->groupId != id) {
break; break;
} }
...@@ -1061,7 +1062,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -1061,7 +1062,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy); SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
if (pResult) { if (pResult) {
return pResult->info.rows > 0 ? pResult : NULL; return pResult->info.rows > 0 ? pResult : NULL;
} }
...@@ -1133,7 +1134,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -1133,7 +1134,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
} else { } else {
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
} }
pInfo->pOperatorDumy = pTableScanDummy; pInfo->pSnapshotReadOp = pTableScanDummy;
pInfo->interval = pSTInfo->interval; pInfo->interval = pSTInfo->interval;
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
...@@ -1545,7 +1546,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1545,7 +1546,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE; int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
: TDMT_MND_SYSTABLE_RETRIEVE;
pMsgSendInfo->param = pOperator; pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1; pMsgSendInfo->msgInfo.pData = buf1;
...@@ -1831,7 +1833,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1831,7 +1833,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} else { } else {
data = (char*)p; data = (char*)p;
} }
colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); colDataAppend(pDst, count, data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) { data != NULL) {
...@@ -1884,11 +1887,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1884,11 +1887,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error; goto _error;
} }
pInfo->pTableList = pTableListInfo; pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList; pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pOperator->name = "TagScanOperator"; pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
......
...@@ -2235,7 +2235,8 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) { ...@@ -2235,7 +2235,8 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) {
} }
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
ASSERT(pDest->info.capacity >= pSource->info.rows); // ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity(pDest, pSource->info.rows);
clearUpdateDataBlock(pDest); clearUpdateDataBlock(pDest);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册