提交 329e257a 编写于 作者: dengyihao's avatar dengyihao

remove assert

上级 b2f306e0
......@@ -36,7 +36,6 @@ extern "C" {
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
type c = var; \
assert(sizeof(type) == sizeof(c)); \
memcpy((void *)buf, (void *)&c, sizeof(c)); \
buf += sizeof(c); \
} while (0)
......
......@@ -226,7 +226,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
indexDebug("w suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL);
ASSERTS(*cache != NULL, "index-cache already release");
if (*cache == NULL) return -1;
int ret = idxCachePut(*cache, p, uid);
if (ret != 0) {
return ret;
......
......@@ -170,7 +170,6 @@ TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t d
}
return tDoCompare(func, cmptype, &va, &vb);
}
assert(0);
return BREAK;
#endif
}
......@@ -367,7 +366,7 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(dst, src, strlen(src));
break;
default:
ASSERT(0);
ASSERTS(0, "index invalid input type");
break;
}
*dst = (char*)*dst - tlen;
......@@ -459,7 +458,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
*dst = (char*)*dst - tlen;
break;
default:
ASSERT(0);
ASSERTS(0, "index invalid input type");
break;
}
return tlen;
......
......@@ -206,7 +206,9 @@ static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) {
static FORCE_INLINE int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SOperatorNode *nd = (SOperatorNode *)node;
assert(nodeType(node) == QUERY_NODE_OPERATOR);
if (nodeType(node) != QUERY_NODE_OPERATOR) {
return -1;
}
SColumnNode *l = (SColumnNode *)nd->pLeft;
SValueNode *r = (SValueNode *)nd->pRight;
......
......@@ -65,10 +65,7 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes* nodes, bool isFinal) {
taosArrayPush(nodes->stack, &un);
}
FstBuilderNode* fstUnFinishedNodesPopRoot(FstUnFinishedNodes* nodes) {
assert(taosArrayGetSize(nodes->stack) == 1);
FstBuilderNodeUnfinished* un = taosArrayPop(nodes->stack);
assert(un->last == NULL);
return un->node;
}
......@@ -82,7 +79,6 @@ FstBuilderNode* fstUnFinishedNodesPopFreeze(FstUnFinishedNodes* nodes, CompiledA
FstBuilderNode* fstUnFinishedNodesPopEmpty(FstUnFinishedNodes* nodes) {
FstBuilderNodeUnfinished* un = taosArrayPop(nodes->stack);
assert(un->last == NULL);
return un->node;
}
void fstUnFinishedNodesSetRootOutput(FstUnFinishedNodes* nodes, Output out) {
......@@ -102,7 +98,8 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
}
int32_t sz = taosArrayGetSize(nodes->stack) - 1;
FstBuilderNodeUnfinished* un = taosArrayGet(nodes->stack, sz);
assert(un->last == NULL);
ASSERTS(un->last == NULL, "index-fst meet unexpected node");
if (un->last != NULL) return;
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
// trn->inp = s->data[s->start];
......@@ -247,7 +244,6 @@ void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition*
}
void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode* node) {
int32_t sz = taosArrayGetSize(node->trans);
assert(sz <= 256);
uint8_t tSize = 0;
uint8_t oSize = packSize(node->finalOutput);
......@@ -322,7 +318,7 @@ void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode
// set_comm_input
void fstStateSetCommInput(FstState* s, uint8_t inp) {
assert(s->state == OneTransNext || s->state == OneTrans);
ASSERT(s->state == OneTransNext || s->state == OneTrans);
uint8_t val;
COMMON_INDEX(inp, 0b111111, val);
......@@ -331,7 +327,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
// comm_input
uint8_t fstStateCommInput(FstState* s, bool* null) {
assert(s->state == OneTransNext || s->state == OneTrans);
ASSERT(s->state == OneTransNext || s->state == OneTrans);
uint8_t v = s->val & 0b00111111;
if (v == 0) {
*null = true;
......@@ -344,7 +340,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) {
// input_len
uint64_t fstStateInputLen(FstState* s) {
assert(s->state == OneTransNext || s->state == OneTrans);
ASSERT(s->state == OneTransNext || s->state == OneTrans);
bool null = false;
fstStateCommInput(s, &null);
return null ? 1 : 0;
......@@ -352,11 +348,11 @@ uint64_t fstStateInputLen(FstState* s) {
// end_addr
uint64_t fstStateEndAddrForOneTransNext(FstState* s, FstSlice* data) {
assert(s->state == OneTransNext);
ASSERT(s->state == OneTransNext);
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s);
}
uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes) {
assert(s->state == OneTrans);
ASSERT(s->state == OneTrans);
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
}
......@@ -370,7 +366,7 @@ uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice*
}
// input
uint8_t fstStateInput(FstState* s, FstNode* node) {
assert(s->state == OneTransNext || s->state == OneTrans);
ASSERT(s->state == OneTransNext || s->state == OneTrans);
FstSlice* slice = &node->data;
bool null = false;
uint8_t inp = fstStateCommInput(s, &null);
......@@ -378,7 +374,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
return null == false ? inp : data[node->start - 1];
}
uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
FstSlice* slice = &node->data;
uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size
......@@ -390,7 +386,7 @@ uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
// trans_addr
CompiledAddr fstStateTransAddr(FstState* s, FstNode* node) {
assert(s->state == OneTransNext || s->state == OneTrans);
ASSERT(s->state == OneTransNext || s->state == OneTrans);
FstSlice* slice = &node->data;
if (s->state == OneTransNext) {
return (CompiledAddr)(node->end) - 1;
......@@ -406,7 +402,7 @@ CompiledAddr fstStateTransAddr(FstState* s, FstNode* node) {
}
}
CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
FstSlice* slice = &node->data;
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
......@@ -418,7 +414,7 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
// sizes
PackSizes fstStateSizes(FstState* s, FstSlice* slice) {
assert(s->state == OneTrans || s->state == AnyTrans);
ASSERT(s->state == OneTrans || s->state == AnyTrans);
uint64_t i;
if (s->state == OneTrans) {
i = FST_SLICE_LEN(slice) - 1 - fstStateInputLen(s) - 1;
......@@ -431,7 +427,7 @@ PackSizes fstStateSizes(FstState* s, FstSlice* slice) {
}
// Output
Output fstStateOutput(FstState* s, FstNode* node) {
assert(s->state == OneTrans);
ASSERT(s->state == OneTrans);
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes);
if (oSizes == 0) {
......@@ -445,7 +441,7 @@ Output fstStateOutput(FstState* s, FstNode* node) {
return unpackUint64(data + i, oSizes);
}
Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes);
if (oSizes == 0) {
......@@ -462,19 +458,19 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
// anyTrans specify function
void fstStateSetFinalState(FstState* s, bool yes) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
if (yes) {
s->val |= 0b01000000;
}
return;
}
bool fstStateIsFinalState(FstState* s) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
return (s->val & 0b01000000) == 0b01000000;
}
void fstStateSetStateNtrans(FstState* s, uint8_t n) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
if (n <= 0b00111111) {
s->val = (s->val & 0b11000000) | n;
}
......@@ -482,7 +478,7 @@ void fstStateSetStateNtrans(FstState* s, uint8_t n) {
}
// state_ntrans
uint8_t fstStateStateNtrans(FstState* s, bool* null) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
*null = false;
uint8_t n = s->val & 0b00111111;
......@@ -492,16 +488,16 @@ uint8_t fstStateStateNtrans(FstState* s, bool* null) {
return n;
}
uint64_t fstStateTotalTransSize(FstState* s, uint64_t version, PackSizes sizes, uint64_t nTrans) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
uint64_t idxSize = fstStateTransIndexSize(s, version, nTrans);
return nTrans + (nTrans * FST_GET_TRANSITION_PACK_SIZE(sizes)) + idxSize;
}
uint64_t fstStateTransIndexSize(FstState* s, uint64_t version, uint64_t nTrans) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
return (version >= 2 && nTrans > TRANS_INDEX_THRESHOLD) ? 256 : 0;
}
uint64_t fstStateNtransLen(FstState* s) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
bool null = false;
fstStateStateNtrans(s, &null);
return null == true ? 1 : 0;
......@@ -530,7 +526,7 @@ Output fstStateFinalOutput(FstState* s, uint64_t version, FstSlice* slice, PackS
return unpackUint64(data + at, (uint8_t)oSizes);
}
uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
assert(s->state == AnyTrans);
ASSERT(s->state == AnyTrans);
FstSlice* slice = &node->data;
if (node->version >= 2 && node->nTrans > TRANS_INDEX_THRESHOLD) {
uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size
......@@ -676,17 +672,17 @@ bool fstNodeGetTransitionAddrAt(FstNode* node, uint64_t i, CompiledAddr* res) {
bool s = true;
FstState* st = &node->state;
if (st->state == OneTransNext) {
assert(i == 0);
ASSERT(i == 0);
fstStateTransAddr(st, node);
} else if (st->state == OneTrans) {
assert(i == 0);
ASSERT(i == 0);
fstStateTransAddr(st, node);
} else if (st->state == AnyTrans) {
fstStateTransAddrForAnyTrans(st, node, i);
} else if (FST_STATE_EMPTY_FINAL(node)) {
s = false;
} else {
assert(0);
ASSERT(0);
}
return s;
}
......@@ -722,7 +718,7 @@ bool fstNodeFindInput(FstNode* node, uint8_t b, uint64_t* res) {
bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr addr, FstBuilderNode* builderNode) {
int32_t sz = taosArrayGetSize(builderNode->trans);
assert(sz < 256);
ASSERT(sz < 256);
if (sz == 0 && builderNode->isFinal && builderNode->finalOutput == 0) {
return true;
} else if (sz != 1 || builderNode->isFinal) {
......@@ -804,7 +800,7 @@ void fstBuilderInsertOutput(FstBuilder* b, FstSlice bs, Output in) {
uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
if (prefixLen == FST_SLICE_LEN(s)) {
assert(out == 0);
ASSERT(out == 0);
return;
}
......@@ -848,7 +844,7 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) {
addr = fstBuilderCompile(b, bn);
fstBuilderNodeDestroy(bn);
assert(addr != NONE_ADDRESS);
ASSERT(addr != NONE_ADDRESS);
}
fstUnFinishedNodesTopLastFreeze(b->unfinished, addr);
return;
......
......@@ -104,8 +104,9 @@ bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet
DfaState *t = taosArrayGet(builder->dfa->states, state);
for (int i = 0; i < taosArrayGetSize(t->insts); i++) {
int32_t ip = *(int32_t *)taosArrayGet(t->insts, i);
bool succ = sparSetAdd(cur, ip, NULL);
assert(succ == true);
bool succ = sparSetAdd(cur, ip, NULL);
if (succ == false) return false;
}
dfaRun(builder->dfa, cur, next, byte);
......
......@@ -100,7 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
do {
char key[1024] = {0};
assert(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
ASSERT(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
idxGenLRUKey(key, ctx->file.buf, blkId);
LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));
......@@ -114,7 +114,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
if (left < kBlockSize) {
nread = TMIN(left, len);
int32_t bytes = taosPReadFile(ctx->file.pFile, buf + total, nread, offset);
assert(bytes == nread);
ASSERTS(bytes == nread, "index read incomplete data");
if (bytes != nread) break;
total += bytes;
return total;
......@@ -124,7 +125,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
blk->blockId = blkId;
blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
assert(blk->nread <= kBlockSize);
ASSERTS(blk->nread <= kBlockSize, "index read incomplete data");
if (blk->nread > kBlockSize) break;
if (blk->nread < kBlockSize && blk->nread < len) {
taosMemoryFree(blk);
......@@ -275,7 +277,10 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
// update checksum
IFileCtx* ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len);
ASSERTS(nWrite == len, "index write incomplete data");
if (nWrite != len) {
return -1;
}
write->count += len;
write->summer = taosCalcChecksum(write->summer, buf, len);
......@@ -302,7 +307,6 @@ int idxFileFlush(IdxFstFile* write) {
}
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
assert(1 <= nBytes && nBytes <= 8);
uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
for (uint8_t i = 0; i < nBytes; i++) {
buf[i] = (uint8_t)n;
......
......@@ -57,8 +57,8 @@ static void fstRegistryCellPromote(SArray* arr, uint32_t start, uint32_t end) {
if (start >= sz && end >= sz) {
return;
}
assert(start >= end);
ASSERTS(start >= end, "index-fst start lower than end");
if (start < end) return;
int32_t s = (int32_t)start;
int32_t e = (int32_t)end;
......
......@@ -101,7 +101,6 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) {
int32_t slen;
uint8_t* data = fstSliceData(s, &slen);
assert(tlen <= slen);
uint8_t* buf = taosMemoryMalloc(sizeof(uint8_t) * tlen);
memcpy(buf, data + start, tlen);
......
......@@ -122,7 +122,6 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
char buf[128] = {0};
int32_t sz = idxSerialCacheKey(&key, buf);
assert(sz < sizeof(buf));
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader);
}
......@@ -151,9 +150,8 @@ void tfileCacheDestroy(TFileCache* tcache) {
}
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0};
int32_t sz = idxSerialCacheKey(key, buf);
assert(sz < sizeof(buf));
char buf[128] = {0};
int32_t sz = idxSerialCacheKey(key, buf);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL || *reader == NULL) {
return NULL;
......@@ -877,7 +875,7 @@ static int tfileWriteFooter(TFileWriter* write) {
int nwrite = write->ctx->write(write->ctx, buf, (int32_t)strlen(buf));
indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
assert(nwrite == sizeof(FILE_MAGIC_NUMBER));
ASSERTS(nwrite == sizeof(FILE_MAGIC_NUMBER), "index write incomplete data");
return nwrite;
}
static int tfileReaderLoadHeader(TFileReader* reader) {
......@@ -892,7 +890,6 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
} else {
indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
}
// assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf));
return 0;
......@@ -914,7 +911,10 @@ static int tfileReaderLoadFst(TFileReader* reader) {
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
reader->header.fstOffset, fstSize, ctx->file.buf, size, cost);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread <= fstSize);
ASSERTS(nread > 0 && nread <= fstSize, "index read incomplete fst");
if (nread <= 0 || nread > fstSize) {
return -1;
}
FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
reader->fst = fstCreate(&st);
......@@ -929,7 +929,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
// add block cache
char block[4096] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
assert(nread >= sizeof(uint32_t));
ASSERT(nread >= sizeof(uint32_t));
char* p = block;
int32_t nid = *(int32_t*)p;
......
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 1) /*
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 2) * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 3) *
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 4) * This program is free software: you can use, redistribute, and/or modify
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 5) * it under the terms of the GNU Affero General Public License, version 3
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 6) * or later ("AGPL"), as published by the Free Software Foundation.
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 7) *
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 8) * This program is distributed in the hope that it will be useful, but WITHOUT
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 9) * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 10) * FITNESS FOR A PARTICULAR PURPOSE.
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 11) *
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 12) * You should have received a copy of the GNU Affero General Public License
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 13) * along with this program. If not, see <http://www.gnu.org/licenses/>.
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 14) */
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 15)
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 16) #define _DEFAULT_SOURCE
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 17) #include "tmsgcb.h"
2f42c2e7933 source/common/src/tmsgcb.c (Shengliang Guan 2022-05-04 22:00:04 +0800 18) #include "taoserror.h"
363cbc8985d source/libs/transport/src/tmsgcb.c (Benguang Zhao 2022-11-18 09:37:58 +0800 19) #include "transLog.h"
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 20) #include "trpc.h"
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 21)
0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 22) static SMsgCb defaultMsgCb;
ac6b121348d source/common/src/tmsgcb.c (Shengliang Guan 2022-03-29 13:39:55 +0800 23)
0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 24) void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
ac6b121348d source/common/src/tmsgcb.c (Shengliang Guan 2022-03-29 13:39:55 +0800 25)
0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 26) int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
363cbc8985d source/libs/transport/src/tmsgcb.c (Benguang Zhao 2022-11-18 09:37:58 +0800 27) ASSERT(msgcb != NULL);
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 28) int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg);
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 29) if (code != 0) {
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 30) rpcFreeCont(pMsg->pCont);
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 31) pMsg->pCont = NULL;
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 32) }
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 33) return code;
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 34) }
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 35)
0164158256d source/common/src/tmsgcb.c (Shengliang Guan 2022-05-18 18:37:39 +0800 36) int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
a9b32dc3276 source/common/src/tmsgcb.c (Shengliang Guan 2022-06-02 18:26:29 +0800 37) return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype);
b36356ae57a source/common/src/tmsgcb.c (Shengliang Guan 2022-03-22 15:53:29 +0800 38) }
b36356ae57a source/common/src/tmsgcb.c (Shengliang Guan 2022-03-22 15:53:29 +0800 39)
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 40) int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 41) int32_t code = (*defaultMsgCb.sendReqFp)(epSet, pMsg);
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 42) if (code != 0) {
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 43) rpcFreeCont(pMsg->pCont);
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 44) pMsg->pCont = NULL;
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 45) }
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 46) return code;
7afdd3603d3 source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-07-07 17:20:03 +0800 47) }
220fdfabe29 source/common/src/tmsgcb.c (Shengliang Guan 2022-03-21 19:08:25 +0800 48)
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 49) void tmsgSendRsp(SRpcMsg* pMsg) {
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 50) #if 1
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 51) rpcSendResponse(pMsg);
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 52) #else
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 53) return (*defaultMsgCb.sendRspFp)(pMsg);
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 54) #endif
9c426540e7e source/libs/transport/src/tmsgcb.c (Shengliang Guan 2022-11-18 20:15:13 +0800 55) }
f82afcfe4dd source/common/src/tmsgcb.c (Shengliang Guan 2022-03-28 20:45:52 +0800 56)
a9b32dc3276 source/common/src/tmsgcb.c (Shengliang Guan 2022-06-02 18:26:29 +0800 57) void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); }
182a5ee4b5a source/common/src/tmsgcb.c (Shengliang Guan 2022-03-29 11:37:29 +0800 58)
a9b32dc3276 source/common/src/tmsgcb.c (Shengliang Guan 2022-06-02 18:26:29 +0800 59) void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); }
7c588cc0747 source/common/src/tmsgcb.c (Shengliang Guan 2022-04-19 19:43:55 +0800 60)
226ccb4ec5a source/libs/transport/src/tmsgcb.c (Minglei Jin 2022-07-23 22:34:35 +0800 61) void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); }
......@@ -160,21 +160,12 @@ int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle);
}
void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
void rpcUnrefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosUnRefHandle[type])(handle);
}
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
return (*transReleaseHandle[type])(handle);
}
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later
......
......@@ -651,7 +651,6 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return;
}
assert(nread <= 0);
if (nread == 0) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
......@@ -801,7 +800,11 @@ static void cliSendCb(uv_write_t* req, int status) {
}
void cliSend(SCliConn* pConn) {
assert(!transQueueEmpty(&pConn->cliMsgs));
bool empty = transQueueEmpty(&pConn->cliMsgs);
ASSERTS(empty == false, "trans-cli get invalid msg");
if (empty == true) {
return;
}
SCliMsg* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(pConn);
......@@ -933,7 +936,6 @@ void cliConnCb(uv_connect_t* req, int status) {
transSockInfo2Str(&sockname, pConn->src);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle);
cliSend(pConn);
}
......@@ -1237,7 +1239,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
for (int i = 0; ahandle == 0 && i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
if (cliMsg->type == Release) {
assert(pMsg == NULL);
ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
return true;
}
}
......@@ -1665,7 +1667,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
int32_t code = pResp->code;
// return internal code app
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
pResp->code = pCtx->retryCode;
}
}
......
......@@ -134,7 +134,9 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
transResetBuffer(connBuf);
if (transResetBuffer(connBuf) < 0) {
return -1;
}
} else {
total = -1;
}
......@@ -154,7 +156,8 @@ int transResetBuffer(SConnBuffer* connBuf) {
p->total = 0;
p->len = 0;
} else {
assert(0);
ASSERTS(0, "invalid read from sock buf");
return -1;
}
return 0;
}
......
......@@ -267,7 +267,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId);
assert(transMsg.info.handle != NULL);
ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
if (transMsg.info.handle == NULL) {
return false;
}
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
......@@ -718,8 +721,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return;
}
// free memory allocated by
assert(nread == strlen(notify));
assert(buf->base[0] == notify[0]);
ASSERTS(nread == strlen(notify), "trans-svr mem corrupted");
ASSERTS(buf->base[0] == notify[0], "trans-svr mem corrupted");
taosMemoryFree(buf->base);
SWorkThrd* pThrd = q->data;
......@@ -731,7 +734,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SSvrConn* pConn = createConn(pThrd);
......@@ -971,19 +973,24 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
int ret = uv_pipe_init(srv->loop, pipe, 1);
assert(ret == 0);
ASSERTS(ret == 0, "trans-svr failed to init pipe");
if (ret != 0) return;
ret = uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe);
assert(ret == 0);
ASSERTS(ret == 0, "trans-svr failed to accept pipe msg");
if (ret != 0) return;
ret = uv_is_readable((uv_stream_t*)pipe);
assert(ret == 1);
ASSERTS(ret == 1, "trans-svr pipe status corrupted");
if (ret != 1) return;
ret = uv_is_writable((uv_stream_t*)pipe);
assert(ret == 1);
ASSERTS(ret == 1, "trans-svr pipe status corrupted");
if (ret != 0) return;
ret = uv_is_closing((uv_handle_t*)pipe);
assert(ret == 0);
ASSERTS(ret == 0, "trans-svr pipe status corrupted");
if (ret != 0) return;
srv->numOfWorkerReady++;
}
......@@ -1272,7 +1279,6 @@ int transSendResponse(const STransMsg* msg) {
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
assert(refId != 0);
STransMsg tmsg = *msg;
tmsg.info.refId = refId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册