提交 433286c0 编写于 作者: S shenglian zhou

Merge branch '3.0' of github.com:taosdata/TDengine into szhou/feature/merge-interval

......@@ -116,6 +116,8 @@ typedef struct SQueryTableDataCond {
int32_t type; // data block load type:
int32_t numOfTWindows;
STimeWindow* twindows;
int32_t startVersion;
int32_t endVersion;
} SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock);
......
......@@ -1341,6 +1341,13 @@ typedef struct {
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
typedef struct {
int32_t vgId;
} SSplitVgroupReq;
int32_t tSerializeSSplitVgroupReq(void* buf, int32_t bufLen, SSplitVgroupReq* pReq);
int32_t tDeserializeSSplitVgroupReq(void* buf, int32_t bufLen, SSplitVgroupReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
char spi;
......@@ -2494,15 +2501,15 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct {
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
SEpSet epSet;
char* expr;
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
SEpSet epSet;
char* expr;
} STableIndexInfo;
typedef struct {
......@@ -2511,7 +2518,7 @@ typedef struct {
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
void tFreeSTableIndexInfo(void *pInfo);
void tFreeSTableIndexInfo(void* pInfo);
typedef struct {
int8_t mqMsgType;
......
......@@ -157,6 +157,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......
......@@ -192,67 +192,68 @@
#define TK_VGROUP 174
#define TK_MERGE 175
#define TK_REDISTRIBUTE 176
#define TK_SYNCDB 177
#define TK_DELETE 178
#define TK_NULL 179
#define TK_NK_QUESTION 180
#define TK_NK_ARROW 181
#define TK_ROWTS 182
#define TK_TBNAME 183
#define TK_QSTARTTS 184
#define TK_QENDTS 185
#define TK_WSTARTTS 186
#define TK_WENDTS 187
#define TK_WDURATION 188
#define TK_CAST 189
#define TK_NOW 190
#define TK_TODAY 191
#define TK_TIMEZONE 192
#define TK_COUNT 193
#define TK_FIRST 194
#define TK_LAST 195
#define TK_LAST_ROW 196
#define TK_BETWEEN 197
#define TK_IS 198
#define TK_NK_LT 199
#define TK_NK_GT 200
#define TK_NK_LE 201
#define TK_NK_GE 202
#define TK_NK_NE 203
#define TK_MATCH 204
#define TK_NMATCH 205
#define TK_CONTAINS 206
#define TK_JOIN 207
#define TK_INNER 208
#define TK_SELECT 209
#define TK_DISTINCT 210
#define TK_WHERE 211
#define TK_PARTITION 212
#define TK_BY 213
#define TK_SESSION 214
#define TK_STATE_WINDOW 215
#define TK_SLIDING 216
#define TK_FILL 217
#define TK_VALUE 218
#define TK_NONE 219
#define TK_PREV 220
#define TK_LINEAR 221
#define TK_NEXT 222
#define TK_HAVING 223
#define TK_ORDER 224
#define TK_SLIMIT 225
#define TK_SOFFSET 226
#define TK_LIMIT 227
#define TK_OFFSET 228
#define TK_ASC 229
#define TK_NULLS 230
#define TK_ID 231
#define TK_NK_BITNOT 232
#define TK_INSERT 233
#define TK_VALUES 234
#define TK_IMPORT 235
#define TK_NK_SEMI 236
#define TK_FILE 237
#define TK_SPLIT 177
#define TK_SYNCDB 178
#define TK_DELETE 179
#define TK_NULL 180
#define TK_NK_QUESTION 181
#define TK_NK_ARROW 182
#define TK_ROWTS 183
#define TK_TBNAME 184
#define TK_QSTARTTS 185
#define TK_QENDTS 186
#define TK_WSTARTTS 187
#define TK_WENDTS 188
#define TK_WDURATION 189
#define TK_CAST 190
#define TK_NOW 191
#define TK_TODAY 192
#define TK_TIMEZONE 193
#define TK_COUNT 194
#define TK_FIRST 195
#define TK_LAST 196
#define TK_LAST_ROW 197
#define TK_BETWEEN 198
#define TK_IS 199
#define TK_NK_LT 200
#define TK_NK_GT 201
#define TK_NK_LE 202
#define TK_NK_GE 203
#define TK_NK_NE 204
#define TK_MATCH 205
#define TK_NMATCH 206
#define TK_CONTAINS 207
#define TK_JOIN 208
#define TK_INNER 209
#define TK_SELECT 210
#define TK_DISTINCT 211
#define TK_WHERE 212
#define TK_PARTITION 213
#define TK_BY 214
#define TK_SESSION 215
#define TK_STATE_WINDOW 216
#define TK_SLIDING 217
#define TK_FILL 218
#define TK_VALUE 219
#define TK_NONE 220
#define TK_PREV 221
#define TK_LINEAR 222
#define TK_NEXT 223
#define TK_HAVING 224
#define TK_ORDER 225
#define TK_SLIMIT 226
#define TK_SOFFSET 227
#define TK_LIMIT 228
#define TK_OFFSET 229
#define TK_ASC 230
#define TK_NULLS 231
#define TK_ID 232
#define TK_NK_BITNOT 233
#define TK_INSERT 234
#define TK_VALUES 235
#define TK_IMPORT 236
#define TK_NK_SEMI 237
#define TK_FILE 238
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -692,6 +692,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -1219,6 +1219,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rowSize;
pBlock->info.groupId = pDataBlock->info.groupId;
pBlock->info.childId = pDataBlock->info.childId;
pBlock->info.type = pDataBlock->info.type;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
......@@ -1499,6 +1501,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
int32_t colNum = pDataBlock->info.numOfCols;
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
for (int32_t k = 0; k < colNum; k++) {
......
......@@ -2419,7 +2419,7 @@ int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pR
return 0;
}
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) {
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo *pInfo) {
if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1;
if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1;
......@@ -2441,7 +2441,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i);
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
......@@ -2491,12 +2491,12 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
return 0;
}
void tFreeSTableIndexInfo(void* info) {
void tFreeSTableIndexInfo(void *info) {
if (NULL == info) {
return;
}
STableIndexInfo *pInfo = (STableIndexInfo*)info;
STableIndexInfo *pInfo = (STableIndexInfo *)info;
taosMemoryFree(pInfo->expr);
}
......@@ -3448,6 +3448,31 @@ int32_t tDeserializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistrib
return 0;
}
int32_t tSerializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -484,6 +484,7 @@ typedef struct {
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>
char dbName[TSDB_DB_FNAME_LEN];
} SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
......
......@@ -35,7 +35,7 @@
#define MND_CONSUMER_LOST_HB_CNT 3
static int8_t mqRebLock = 0;
static int8_t mqRebInExecCnt = 0;
static const char *mndConsumerStatusName(int status);
......@@ -76,15 +76,15 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
bool mndRebTryStart() {
int8_t old = atomic_val_compare_exchange_8(&mqRebLock, 0, 1);
int8_t old = atomic_val_compare_exchange_8(&mqRebInExecCnt, 0, 1);
return old == 0;
}
void mndRebEnd() { atomic_sub_fetch_8(&mqRebLock, 1); }
void mndRebEnd() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntInc() { atomic_add_fetch_8(&mqRebLock, 1); }
void mndRebCntInc() { atomic_add_fetch_8(&mqRebInExecCnt, 1); }
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebLock, 1); }
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebInExecCnt, 1); }
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
......@@ -92,7 +92,6 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
ASSERT(pConsumer);
mInfo("receive consumer lost msg, consumer id %ld, status %s", pLostMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
......@@ -106,7 +105,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
......@@ -125,6 +124,14 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
ASSERT(pConsumer);
mInfo("receive consumer recover msg, consumer id %ld, status %s", pRecoverMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER;
......@@ -844,10 +851,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
topicSz = 1;
}
if (numOfRows + topicSz > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
}
for (int32_t i = 0; i < topicSz; i++) {
if (numOfRows + topicSz > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
}
SColumnInfoData *pColInfo;
int32_t cols = 0;
......
......@@ -415,6 +415,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
}
pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp);
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
return pSubNew;
}
......@@ -445,6 +446,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
}
ASSERT(cnt == sz);
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName);
return tlen;
}
......@@ -467,6 +469,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
}
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
buf = taosDecodeStringTo(buf, pSub->dbName);
return (void *)buf;
}
......
......@@ -402,7 +402,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg);
mndTransSetDbName(pTrans, pOutput->pSub->dbName);
if (pTrans == NULL) return -1;
// make txn:
......@@ -547,6 +548,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosRLockLatch(&pTopic->lock);
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);
taosRUnLockLatch(&pTopic->lock);
......
......@@ -87,30 +87,22 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
SNode *pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
ASSERT(0);
return false;
return -1;
}
SHashObj *pColHash = NULL;
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) goto NEXT;
if (pColHash == NULL) {
pColHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
}
if (pCol->colId > 0) {
taosHashPut(pColHash, &pCol->colId, sizeof(int16_t), NULL, 0);
if (pCol->colId > 0 && pCol->colId == colId) {
found = true;
goto NEXT;
}
mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
}
if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) {
found = true;
goto NEXT;
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
......@@ -563,7 +555,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
mError("topic:%s, failed to drop since subscribed by consumer %ld in consumer group %s", dropReq.name,
pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
......@@ -580,7 +572,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
mndTransSetDbName(pTrans, pTopic->db);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
......
......@@ -89,7 +89,7 @@ typedef struct SMetaFltParam {
tb_uid_t suid;
int16_t cid;
int16_t type;
char * val;
char *val;
bool reverse;
int (*filterFunc)(void *a, void *b, int16_t type);
......@@ -116,16 +116,16 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void * tsdbGetIdx(SMeta *pMeta);
void * tsdbGetIvtIdx(SMeta *pMeta);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
......@@ -201,7 +201,7 @@ struct SMetaEntry {
int64_t version;
int8_t type;
tb_uid_t uid;
char * name;
char *name;
union {
struct {
SSchemaWrapper schemaRow;
......@@ -229,17 +229,17 @@ struct SMetaEntry {
struct SMetaReader {
int32_t flags;
SMeta * pMeta;
SMeta *pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
void *pBuf;
int32_t szBuf;
};
struct SMTbCursor {
TBC * pDbc;
void * pKey;
void * pVal;
TBC *pDbc;
void *pKey;
void *pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
......
......@@ -119,8 +119,8 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
......
......@@ -500,8 +500,8 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
return TSDB_CODE_SUCCESS;
}
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) {
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
......@@ -642,7 +642,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
return NULL;
}
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pList, qId, taskId);
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
}
......
......@@ -2060,9 +2060,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
if (pCtx[j].increase) {
int64_t ts = *(int64_t*) in;
int64_t ts = *(int64_t*)in;
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char *)&ts, pCtx[j].resultInfo->isNullRes);
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
ts++;
}
} else {
......@@ -3162,8 +3162,8 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
}
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortedMergeOperatorInfo *pInfo) {
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
......@@ -4735,8 +4735,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr =
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
......@@ -4754,10 +4755,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 8;
qDebug("[******]create Semi");
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
int32_t children = 0;
qDebug("[******]create Final");
int32_t children = 1;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......@@ -5132,7 +5135,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error;
}
return tsdbQueryTables(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
return tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
_error:
terrno = code;
......@@ -5414,7 +5417,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size) {
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
size_t size) {
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
......@@ -5435,7 +5439,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
bufSize = pageSize * 4;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
for(int32_t i = 0; i < numOfOutput; ++i) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
}
return code;
......
......@@ -809,11 +809,18 @@ static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSD
// return p;
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
colInfoDataEnsureCapacity(pCol, 0, size);
blockDataEnsureCapacity(pUpdateBlock, size);
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
colDataAppend(pCol, i, (char*)pTs, false);
}
for (int32_t i = 0; i < pUpdateBlock->info.numOfCols; i++) {
if (i == pInfo->primaryTsIndex) {
continue;
}
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, i);
colDataAppendNNULL(pCol, 0, size);
}
pUpdateBlock->info.rows = size;
pUpdateBlock->info.type = STREAM_REPROCESS;
blockDataUpdateTsWindow(pUpdateBlock, 0);
......@@ -841,7 +848,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
int32_t current = pInfo->validBlockIndex++;
return taosArrayGetP(pInfo->pBlockLists, current);
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0);
return pBlock;
} else {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
......@@ -940,7 +949,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
doFilter(pInfo->pCondition, pInfo->pRes, false);
blockDataUpdateTsWindow(pInfo->pRes, 0);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
break;
}
......
......@@ -1926,6 +1926,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) {
break;
......@@ -2004,10 +2005,7 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_
}
static int32_t getChildIndex(SSDataBlock* pBlock) {
// if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test
// return pBlock->info.rows - 1;
// }
return 0;
return pBlock->info.childId;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
......
......@@ -200,6 +200,21 @@ bool fmIsInvertible(int32_t funcId) {
return res;
}
static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) {
return fmGetFuncInfo(pFunc, msg, sizeof(msg));
}
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
pFunc->funcId = i;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, msg, sizeof(msg));
}
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
......@@ -207,8 +222,8 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis
}
strcpy(pFunc->functionName, pName);
pFunc->pParameterList = pParameterList;
char msg[64] = {0};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, msg, sizeof(msg))) {
if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) {
pFunc->pParameterList = NULL;
nodesDestroyNode(pFunc);
return NULL;
}
......
......@@ -127,9 +127,11 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
int64_t delta = v - pBucket->range.i64MinVal;
index = (delta % pBucket->numOfSlots);
} else {
double slotSpan = (double)span / pBucket->numOfSlots;
index = (int32_t)((v - pBucket->range.i64MinVal) / slotSpan);
if (v == pBucket->range.i64MaxVal) {
double slotSpan = ((double)span) / pBucket->numOfSlots;
uint64_t delta = v - pBucket->range.i64MinVal;
index = (int32_t)(delta / slotSpan);
if (v == pBucket->range.i64MaxVal || index == pBucket->numOfSlots) {
index -= 1;
}
}
......@@ -324,7 +326,6 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
int32_t bytes = pBucket->bytes;
for (int32_t i = 0; i < size; ++i) {
char *d = (char *) data + i * bytes;
int32_t index = (pBucket->hashFunc)(pBucket, d);
if (index < 0) {
continue;
......
......@@ -678,6 +678,7 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
}
static const char* jkMergeLogicPlanMergeKeys = "MergeKeys";
static const char* jkMergeLogicPlanInputs = "Inputs";
static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels";
static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId";
......@@ -688,6 +689,9 @@ static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergeLogicPlanInputs, pNode->pInputs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels);
}
......@@ -705,6 +709,9 @@ static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergeLogicPlanInputs, &pNode->pInputs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels);
}
......
......@@ -472,7 +472,7 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A).
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
//cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
%type dnode_list { SNodeList* }
%destructor dnode_list { nodesDestroyList($$); }
......
......@@ -248,6 +248,9 @@ static int32_t collectMetaKeyFromCreateIndex(SCollectMetaKeyCxt* pCxt, SCreateIn
code =
reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pCxt->pMetaCache);
}
}
return code;
}
......
......@@ -174,7 +174,7 @@ static SKeyword keywordTable[] = {
{"SNODE", TK_SNODE},
{"SNODES", TK_SNODES},
{"SOFFSET", TK_SOFFSET},
// {"SPLIT", TK_SPLIT},
{"SPLIT", TK_SPLIT},
{"STABLE", TK_STABLE},
{"STABLES", TK_STABLES},
{"STATE", TK_STATE},
......
......@@ -824,9 +824,9 @@ static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNo
}
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
SNodeListNode* pRight = (SNodeListNode*)pOp->pRight;
bool first = true;
SDataType targetDt = {0};
SNode* pNode = NULL;
bool first = true;
SDataType targetDt = {0};
SNode* pNode = NULL;
FOREACH(pNode, pRight->pNodeList) {
SDataType dt = ((SExprNode*)pNode)->resType;
if (first) {
......@@ -3672,6 +3672,11 @@ static int32_t translateRedistributeVgroup(STranslateContext* pCxt, SRedistribut
return code;
}
static int32_t translateSplitVgroup(STranslateContext* pCxt, SSplitVgroupStmt* pStmt) {
SSplitVgroupReq req = {.vgId = pStmt->vgId};
return buildCmdMsg(pCxt, TDMT_MND_SPLIT_VGROUP, (FSerializeFunc)tSerializeSSplitVgroupReq, &req);
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
......@@ -3803,6 +3808,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
code = translateRedistributeVgroup(pCxt, (SRedistributeVgroupStmt*)pNode);
break;
case QUERY_NODE_SPLIT_VGROUP_STMT:
code = translateSplitVgroup(pCxt, (SSplitVgroupStmt*)pNode);
break;
default:
break;
}
......
此差异已折叠。
......@@ -267,10 +267,12 @@ TEST_F(ParserInitialCTest, createFunction) {
// run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS './build/lib/libudf2.so' OUTPUTTYPE DOUBLE BUFSIZE 8");
}
TEST_F(ParserInitialCTest, createIndexSma) {
TEST_F(ParserInitialCTest, createSmaIndex) {
useDb("root", "test");
run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
run("CREATE SMA INDEX index2 ON st1 FUNCTION(MAX(c1), MIN(tag1)) INTERVAL(10s)");
}
TEST_F(ParserInitialCTest, createMnode) {
......
......@@ -19,7 +19,7 @@ using namespace std;
namespace ParserTest {
class ParserShowToUseTest : public ParserTestBase {};
class ParserShowToUseTest : public ParserDdlTest {};
// todo SHOW accounts
// todo SHOW apps
......@@ -133,7 +133,24 @@ TEST_F(ParserShowToUseTest, showVgroups) {
// todo SHOW vnodes
// todo split vgroup
TEST_F(ParserShowToUseTest, splitVgroup) {
useDb("root", "test");
SSplitVgroupReq expect = {0};
auto setSplitVgroupReqFunc = [&](int32_t vgId) { expect.vgId = vgId; };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_SPLIT_VGROUP_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_SPLIT_VGROUP);
SSplitVgroupReq req = {0};
ASSERT_EQ(tDeserializeSSplitVgroupReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(req.vgId, expect.vgId);
});
setSplitVgroupReqFunc(15);
run("SPLIT VGROUP 15");
}
TEST_F(ParserShowToUseTest, useDatabase) {
useDb("root", "test");
......
......@@ -135,7 +135,8 @@ typedef struct SStableSplitInfo {
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
if (!fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
!fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
return true;
}
}
......@@ -314,7 +315,12 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
int32_t code = TSDB_CODE_SUCCESS;
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
// NULL == pSubplan means 'merge node' replaces 'split node'.
if (NULL == pSubplan) {
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
} else {
pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
}
if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -340,6 +346,21 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return code;
}
static int32_t stbSplCreateMergeKeysForInterval(SNode* pWStartTs, SNodeList** pMergeKeys) {
SOrderByExprNode* pMergeKey = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pMergeKey) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->pExpr = nodesCloneNode(pWStartTs);
if (NULL == pMergeKey->pExpr) {
nodesDestroyNode(pMergeKey);
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->order = ORDER_ASC;
pMergeKey->nullOrder = NULL_ORDER_FIRST;
return nodesListMakeStrictAppend(pMergeKeys, pMergeKey);
}
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
......@@ -347,7 +368,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL;
code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk));
code = stbSplCreateMergeKeysForInterval(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
}
......
......@@ -58,4 +58,6 @@ TEST_F(PlanIntervalTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1 INTERVAL(10s)");
run("SELECT _WSTARTTS, COUNT(*) FROM st1 INTERVAL(10s)");
}
......@@ -38,5 +38,5 @@ TEST_F(PlanSuperTableTest, pseudoColOnChildTable) {
TEST_F(PlanSuperTableTest, orderBy) {
useDb("root", "test");
run("SELECT -1*c1, c1 FROM st1 ORDER BY -1*c1");
run("SELECT -1 * c1, c1 FROM st1 ORDER BY -1 * c1");
}
......@@ -1097,15 +1097,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
if (index == -1) {
index = cliRBChoseIdx(pTransInst);
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = index;
pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
pCtx->pSem = sem;
pCtx->pRsp = pRsp;
tsem_init(pCtx->pSem, 0, 0);
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
......@@ -1118,10 +1119,9 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem);
tsem_destroy(pSem);
taosMemoryFree(pSem);
tsem_wait(sem);
tsem_destroy(sem);
taosMemoryFree(sem);
}
/*
......
......@@ -226,7 +226,7 @@ static bool addHandleToAcceptloop(void* arg);
} else { \
refId = exh1->refId; \
} \
} else if (refId == -1) { \
} else if (refId < 0) { \
tTrace("server handle step3"); \
goto _return2; \
} \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册