提交 9f5e4351 编写于 作者: wmmhello's avatar wmmhello

fix:conflict from main

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG d11f210 GIT_TAG 04296a5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -1116,6 +1116,7 @@ _failed: ...@@ -1116,6 +1116,7 @@ _failed:
} }
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
...@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0; int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 40) { if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL; goto FAIL;
} }
...@@ -1811,7 +1812,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1811,7 +1812,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&pRspWrapper); taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
return NULL; return NULL;
} }
...@@ -1831,7 +1831,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1831,7 +1831,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) { if (pDataRsp->head.epoch == consumerEpoch) {
// todo fix it: race condition
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
// update the epset // update the epset
...@@ -1843,6 +1842,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1843,6 +1842,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values.
pVg->currentOffset = pDataRsp->rspOffset; pVg->currentOffset = pDataRsp->rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
......
...@@ -109,23 +109,18 @@ typedef struct { ...@@ -109,23 +109,18 @@ typedef struct {
} STqPushEntry; } STqPushEntry;
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
int64_t walLogLastVer; int64_t walLogLastVer;
SRWLatch lock;
SRWLatch pushLock; SHashObj* pPushMgr; // consumerId -> STqPushEntry
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pPushMgr; // consumerId -> STqPushEntry SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
TDB* pMetaDB;
TDB* pMetaDB; TTB* pExecStore;
TTB* pExecStore; TTB* pCheckStore;
TTB* pCheckStore; SStreamMeta* pStreamMeta;
SStreamMeta* pStreamMeta;
}; };
typedef struct { typedef struct {
...@@ -163,7 +158,7 @@ typedef struct { ...@@ -163,7 +158,7 @@ typedef struct {
int32_t size; int32_t size;
} STqOffsetHead; } STqOffsetHead;
STqOffsetStore* tqOffsetOpen(); STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
......
...@@ -51,7 +51,7 @@ void tqCleanUp() { ...@@ -51,7 +51,7 @@ void tqCleanUp() {
} }
} }
static void destroySTqHandle(void* data) { static void destroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data; STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task); qDestroyTask(pData->execHandle.task);
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
...@@ -89,9 +89,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -89,9 +89,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
taosInitRWLatch(&pTq->pushLock); taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
...@@ -236,38 +236,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { ...@@ -236,38 +236,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
} }
#endif #endif
// int32_t len = 0;
// int32_t code = 0;
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
// if (code < 0) {
// return -1;
// }
//
// int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen);
// if (buf == NULL) {
// return -1;
// }
//
// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
// SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len);
// tEncodeSMqDataRsp(&encoder, pRsp);
// tEncoderClear(&encoder);
//
// SRpcMsg rsp = {
// .info = pPushEntry->pInfo,
// .pCont = buf,
// .contLen = tlen,
// .code = 0,
// };
//
// tmsgSendRsp(&rsp);
//
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
...@@ -444,7 +412,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -444,7 +412,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal); tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
consumerId, pHandle->subKey, vgId, formatBuf); consumerId, pHandle->subKey, vgId, formatBuf);
return 0; return 0;
} else { } else {
...@@ -502,7 +470,45 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -502,7 +470,45 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
int32_t code = 0;
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->lock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
taosWUnLockLatch(&pTq->lock);
return code;
}
taosWUnLockLatch(&pTq->lock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64 ", reqId:0x%" PRIx64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1; int32_t code = -1;
STqOffsetVal offset = {0}; STqOffsetVal offset = {0};
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
...@@ -512,9 +518,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -512,9 +518,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
// 1. reset the offset if needed // 1. reset the offset if needed
if (reqOffset.type > 0) { if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
offset = reqOffset; // handle the reset offset cases, according to the consumer's choice.
} else { // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false; bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) { if (code != 0) {
...@@ -525,38 +530,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -525,38 +530,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (blockReturned) { if (blockReturned) {
return 0; return 0;
} }
} else { // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
} }
// this is a normal subscription requirement // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0}; return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
taosWUnLockLatch(&pTq->pushLock);
return code;
}
taosWUnLockLatch(&pTq->pushLock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64", reqId:0x%"PRIx64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
} }
// todo handle the case where re-balance occurs. // todo handle the case where re-balance occurs.
...@@ -573,7 +554,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -573,7 +554,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (metaRsp.metaRspLen > 0) { if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",ts:%" PRId64, ",ts:%" PRId64,
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts); metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp); taosMemoryFree(metaRsp.metaRsp);
...@@ -590,7 +571,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -590,7 +571,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
} }
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64, ",version:%" PRId64,
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version); taosxRsp.rspOffset.version);
} }
...@@ -610,7 +591,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -610,7 +591,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) { if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", ", found new consumer epoch %d, discard req epoch %d",
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break; break;
} }
...@@ -708,31 +689,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -708,31 +689,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
// 2. check re-balance status // 2. check re-balance status
taosRLockLatch(&pTq->pushLock); taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->pushLock); taosRUnLockLatch(&pTq->lock);
return -1; return -1;
} }
taosRUnLockLatch(&pTq->pushLock); taosRUnLockLatch(&pTq->lock);
taosWLockLatch(&pTq->pushLock);
// 3. update the epoch value // 3. update the epoch value
taosWLockLatch(&pTq->lock);
int32_t savedEpoch = pHandle->epoch; int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) { if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
pHandle->epoch = reqEpoch; pHandle->epoch = reqEpoch;
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
return extractDataForMq(pTq, pHandle, &req, pMsg); return doPollDataForMq(pTq, pHandle, &req, pMsg);
} }
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
...@@ -740,12 +721,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -740,12 +721,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->lock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) { if (pHandle) {
...@@ -809,18 +790,18 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -809,18 +790,18 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SVnode* pVnode = pTq->pVnode; SVnode* pVnode = pTq->pVnode;
int32_t vgId = TD_VID(pVnode); int32_t vgId = TD_VID(pVnode);
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "", tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
} }
if (req.newConsumerId == -1) { if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return 0; return 0;
} }
...@@ -910,28 +891,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -910,28 +891,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return tqMetaSaveHandle(pTq, req.subKey, pHandle); return tqMetaSaveHandle(pTq, req.subKey, pHandle);
} } else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
req.newConsumerId);
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->lock);
atomic_store_32(&pHandle->epoch, -1); atomic_store_32(&pHandle->epoch, -1);
// remove if it has been register in the push manager, and return one empty block to consumer // remove if it has been register in the push manager, and return one empty block to consumer
tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true); tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pHandle->execHandle.task); qStreamCloseTsdbReader(pHandle->execHandle.task);
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return -1; return -1;
}
} }
} }
......
...@@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost // lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->lock);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) { if (numOfRegisteredPush > 0) {
...@@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosArrayDestroy(cachedKeyLens); taosArrayDestroy(cachedKeyLens);
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
return -1; return -1;
} }
...@@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosMemoryFree(data); taosMemoryFree(data);
} }
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
} }
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
......
...@@ -4507,6 +4507,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ ...@@ -4507,6 +4507,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
size++;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
...@@ -4519,10 +4520,21 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ ...@@ -4519,10 +4520,21 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg); taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
i += 1;
size++;
} }
j += 1; j += 1;
} }
} }
while (j < numOfCols) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
i += 1;
}
j++;
}
} }
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
...@@ -4602,8 +4614,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, ...@@ -4602,8 +4614,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) { } else if (pAgg->colId < pSup->colId[j]) {
i += 1; i += 1;
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
// ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; *allHave = false;
j += 1; j += 1;
} }
} }
...@@ -4996,4 +5008,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti ...@@ -4996,4 +5008,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
taosMemoryFreeClear(pReader->idStr); taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr); pReader->idStr = taosStrdup(idstr);
} }
\ No newline at end of file
...@@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO ...@@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) { } else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
pVal = (SValueNode*)pOper->pRight; pVal = (SValueNode*)pOper->pRight;
} }
if (NULL == pCol || NULL == pVal) { if (NULL == pCol || NULL == pVal || NULL == pVal->literal || 0 == strcmp(pVal->literal, "")) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -871,6 +871,7 @@ ...@@ -871,6 +871,7 @@
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim ,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = m_in_db
$tbPrefix = m_in_tb
$mtPrefix = m_in_mt
$tbNum = 1
$rowNum = 200
$totalNum = 400
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exists $db
sql create database $db vgroups 1 maxrows 200 minrows 10;
sql use $db
sql create table $mt (ts timestamp, f1 int, f2 float) TAGS(tgcol int)
print ====== start create child tables and insert data
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , NULL , $x )
$x = $x + 1
endw
$i = $i + 1
endw
$i = 1
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x , NULL )
$x = $x + 1
endw
sql flush database $db
print =============== step2
$i = 0
$tb = $tbPrefix . $i
sql select max(f1) from $tb
if $rows != 1 then
return -1
endi
if $data00 != NULL then
return -1
endi
$i = 1
$tb = $tbPrefix . $i
sql select max(f2) from $tb
if $rows != 1 then
return -1
endi
if $data00 != NULL then
return -1
endi
$rowNum = 10
print ====== insert more data
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481700000 + $cc
sql insert into $tb values ($ms , $x , $x )
$x = $x + 1
endw
$i = $i + 1
endw
$i = 1
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481700000 + $cc
sql insert into $tb values ($ms , $x , $x )
$x = $x + 1
endw
sql flush database $db
print =============== step3
$i = 0
$tb = $tbPrefix . $i
sql select max(f1) from $tb
if $rows != 1 then
return -1
endi
if $data00 != 9 then
return -1
endi
$i = 1
$tb = $tbPrefix . $i
sql select max(f2) from $tb
if $rows != 1 then
return -1
endi
if $data00 != 9.00000 then
print $data00
return -1
endi
print =============== clear
#sql drop database $db
#sql select * from information_schema.ins_databases
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then ...@@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then
return -1 return -1
endi endi
sql select * from information_schema.ins_tables where table_name='';
if $rows != 0 then
return -1
endi
sql select tbname from information_schema.ins_tables; sql select tbname from information_schema.ins_tables;
print $rows $data00 print $rows $data00
if $rows != 33 then if $rows != 33 then
......
...@@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t ...@@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*" PRIu64, width, *((uint64_t *)val)); printf("%*" PRIu64, width, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
printf("%*.5f", width, GET_FLOAT_VAL(val)); printf("%*ef", width, GET_FLOAT_VAL(val));
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册