diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 980c86b8778bd693359833f9a4f81cab6819cb4f..6fb7e7a1fce8677870abd3f1489dce2bd8b678b2 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -26,6 +26,14 @@ static void msg_process(TAOS_RES* msg) { printf("topic: %s\n", tmq_get_topic_name(msg)); printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + void* meta; + int32_t metaLen; + tmq_get_raw_meta(msg, &meta, &metaLen); + + printf("meta, len is %d\n", metaLen); + return; + } while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; @@ -129,8 +137,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -191,7 +199,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experiment.use.snapshot", "true"); + tmq_conf_set(conf, "experiment.use.snapshot", "false"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); diff --git a/include/client/taos.h b/include/client/taos.h index 9d4da221f4dbcf4ee5b2c6c36b497a350cb60f5c..d31d5c582cd128db3b5217698a27dd92d8a3d108 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -261,7 +261,7 @@ enum tmq_res_t { typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len); +DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ab4d14b1cc4c6b17ebec20fb0c5c234b4ffcaea6..22747425f078ed16df9e76a0e1eb0a647f56241c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pResultInfo->current += 1; return pResultInfo->row; } + } else if (TD_RES_TMQ_META(res)) { + return NULL; } else { // assert to avoid un-initialization error ASSERT(0); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 949965296ff4180e17af6c137a2e02a4e9e0fe03..637a7ee5dd1073c69de38bcba2f2e973eb49d987 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { - } SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { @@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; - memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - if (rspType == TMQ_MSG_TYPE__POLL_RSP) { + memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); + memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); } taosMemoryFree(pMsg->pData); - tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, - pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset); + tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId, + pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); - pRspObj->resType = RES_TYPE__TMQ; + pRspObj->resType = RES_TYPE__TMQ_META; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; @@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) return 0; } -SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { +void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { + if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); return pRsp; } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", - pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + pollRspWrapper->metaRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } } else { @@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - SMqRspObj* rspObj; - int64_t startTime = taosGetTimestampMs(); + void* rspObj; + int64_t startTime = taosGetTimestampMs(); #if 0 tmqHandleAllDelayedTask(tmq); @@ -1873,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) { +int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) { if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; *raw_meta = pMetaRspObj->metaRsp.metaRsp; diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 0b59e9b6cc611a9da3362fbef3da410376c0ea58..8fbdeb06545d9bdd31db2e630b8a7b7c52281587 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -76,22 +76,28 @@ void deltaToUtcInitOnce() { static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); -static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); -static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); +static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim); +static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim); static char* forwardToTimeStringEnd(char* str); static bool checkTzPresent(const char* str, int32_t len); -static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, - parseLocaltimeDst}; +static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {parseLocaltime, + parseLocaltimeDst}; int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { - return parseTimeWithTz(timestr, utime, timePrec, 'T'); - } else if (checkTzPresent(timestr, len)) { - return parseTimeWithTz(timestr, utime, timePrec, 0); + if (checkTzPresent(timestr, len)) { + return parseTimeWithTz(timestr, utime, timePrec, 'T'); + } else { + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 'T'); + } } else { - return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); + if (checkTzPresent(timestr, len)) { + return parseTimeWithTz(timestr, utime, timePrec, 0); + } else { + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec, 0); + } } } @@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) { return true; } -int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { +int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { *time = 0; struct tm tm = {0}; - char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + char *str; + if (delim == 'T') { + str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm); + } else if (delim == 0) { + str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + } else { + str = NULL; + } + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { - return -1; + //if parse failed, try "%Y-%m-%d" format + str = taosStrpTime(timestr, "%Y-%m-%d", &tm); + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { + return -1; + } } #ifdef _MSC_VER @@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr return 0; } -int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { +int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { *time = 0; struct tm tm = {0}; tm.tm_isdst = -1; - char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + char *str; + if (delim == 'T') { + str = taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm); + } else if (delim == 0) { + str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); + } else { + str = NULL; + } + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { - return -1; + //if parse failed, try "%Y-%m-%d" format + str = taosStrpTime(timestr, "%Y-%m-%d", &tm); + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { + return -1; + } } /* mktime will be affected by TZ, set by using taos_options */ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b23ec1310562b481d975da2c765ff45455e0e9df..cca3a5858814e54d32fb8965326b3786e8f3f642 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { #if 1 if (pReq->useSnapshot) { - tqInfo("retrieve using snapshot"); + // TODO set ver into snapshot int64_t lastVer = walGetCommittedVer(pTq->pWal); if (rsp.reqOffset < lastVer) { + tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer); tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId); if (rsp.blockNum != 0) { rsp.withTbName = false; rsp.rspOffset = lastVer; - tqInfo("direct send by snapshot rsp offset %ld", lastVer); + tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset); + fetchOffset = lastVer; goto SEND_RSP; } } @@ -304,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || pHead->msgType == TDMT_VND_DROP_TTL_TABLE); - // return + tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; metaRsp.reqOffset = pReq->currentOffset; metaRsp.rspOffset = fetchOffset; @@ -387,6 +389,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->epoch = -1; pHandle->execHandle.subType = req.subType; + pHandle->fetchMeta = req.withMeta; pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); for (int32_t i = 0; i < 5; i++) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 286bcea820b61ae146dca47a1512928be3f331dd..91931c2fd81e4895eac2c9f134eb44efb648edb6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo { SReadHandle readHandle; uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; - SOperatorInfo* pOperatorDumy; + SOperatorInfo* pSnapshotReadOp; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SArray* childIds; SessionWindowSupporter sessionSup; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d6893145ab08381a4bd6fecf06d9221cb329dc14..b521983d8e83a88962d871ab05372c4ee8143d38 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_ } for (int32_t i = 0; i < pBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + colDataAppend(pColInfoData, i, data, + (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); } if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && @@ -774,7 +775,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { if (!needRead) { return false; } - STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; + STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->curTWinIdx = 0; tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); @@ -819,11 +820,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { while (1) { SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pOperatorDumy); + pResult = doTableScan(pInfo->pSnapshotReadOp); if (pResult == NULL) { if (prepareDataScan(pInfo)) { // scan next window data - pResult = doTableScan(pInfo->pOperatorDumy); + pResult = doTableScan(pInfo->pSnapshotReadOp); } } if (!pResult) { @@ -857,11 +858,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa blockDataEnsureCapacity(pUpdateBlock, size); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); - pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); int32_t i = 0; for (; i < size; i++) { rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); - uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); if (pInfo->groupId != id) { break; } @@ -1061,7 +1062,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { - SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy); + SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); if (pResult) { return pResult->info.rows > 0 ? pResult : NULL; } @@ -1133,7 +1134,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan } else { pInfo->pUpdateInfo = NULL; } - pInfo->pOperatorDumy = pTableScanDummy; + pInfo->pSnapshotReadOp = pTableScanDummy; pInfo->interval = pSTInfo->interval; pInfo->readHandle = *pHandle; @@ -1545,7 +1546,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return NULL; } - int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE : TDMT_MND_SYSTABLE_RETRIEVE; + int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE + : TDMT_MND_SYSTABLE_RETRIEVE; pMsgSendInfo->param = pOperator; pMsgSendInfo->msgInfo.pData = buf1; @@ -1831,7 +1833,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } else { data = (char*)p; } - colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + colDataAppend(pDst, count, data, + (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && data != NULL) { @@ -1884,11 +1887,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } - pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; - pInfo->pRes = createResDataBlock(pDescNode); - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; + pInfo->pTableList = pTableListInfo; + pInfo->pColMatchInfo = colList; + pInfo->pRes = createResDataBlock(pDescNode); + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 74f2b6a8efcc9b55dbc94eb0c9e27032997e2187..f365be906c322108f964aca1ec6ebcc794c82303 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2235,7 +2235,8 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) { } void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { - ASSERT(pDest->info.capacity >= pSource->info.rows); + // ASSERT(pDest->info.capacity >= pSource->info.rows); + blockDataEnsureCapacity(pDest, pSource->info.rows); clearUpdateDataBlock(pDest); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);