提交 669f9d82 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

文件模式从 100644 更改为 100755
......@@ -154,7 +154,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
//
return queue->qItem;
}
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
......@@ -226,9 +229,7 @@ typedef struct {
int32_t nodeId;
int32_t childId;
int32_t taskId;
// int64_t checkpointVer;
// int64_t processedVer;
SEpSet epSet;
SEpSet epSet;
} SStreamChildEpInfo;
typedef struct {
......@@ -372,15 +373,6 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
return 0;
}
typedef struct {
int32_t reserved;
} SStreamTaskDeployRsp;
typedef struct {
// SMsgHead head;
SStreamTask* task;
} SStreamTaskDeployReq;
typedef struct {
SMsgHead head;
int64_t streamId;
......@@ -478,7 +470,18 @@ typedef struct {
} SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t waitingRspCnt;
int32_t totReq;
SArray* info; // SArray<SArray<SStreamCheckpointInfo>*>
} SStreamRecoverStatus;
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
......@@ -504,7 +507,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb;
TTB* pStateDb;
SHashObj* pTasks;
SHashObj* pRecoveringState;
SHashObj* pRecoverStatus;
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
......
......@@ -47,8 +47,6 @@ typedef struct SRpcHandleInfo {
int8_t persistHandle; // persist handle or not
int8_t hasEpSet;
STraceId traceId;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
......@@ -58,7 +56,8 @@ typedef struct SRpcHandleInfo {
void *rsp;
int32_t rspLen;
// conn info
STraceId traceId;
SRpcConnInfo conn;
} SRpcHandleInfo;
......
......@@ -396,7 +396,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else
#define TSDB_MAX_RPC_THREADS 5
#define TSDB_MAX_RPC_THREADS 10
#endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
......
......@@ -74,7 +74,7 @@ static const SSysDbTableSchema clusterSchema[] = {
static const SSysDbTableSchema userDBSchema[] = {
{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "vgroups", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "strict", .bytes = TSDB_DB_STRICT_STR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
......
......@@ -1706,7 +1706,10 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int32_t *numOfTables = p1;
*numOfTables += pVgroup->numOfTables;
int64_t uid = *(int64_t*)p2;
if (pVgroup->dbUid == uid) {
*numOfTables += pVgroup->numOfTables;
}
return true;
}
......@@ -1747,7 +1750,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) == 0) {
int32_t numOfTables = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, &pDb->uid, NULL);
mndDumpDbInfoData(pMnode, pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus, sysinfo);
numOfRows++;
}
......
......@@ -33,7 +33,7 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
......
......@@ -104,7 +104,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status;
......@@ -136,7 +136,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
tFreeStreamDispatchReq(pReq);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
......@@ -183,6 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
pReq->upstreamTaskId);
streamTaskEnqueue(pTask, pReq, pRsp);
tFreeStreamDispatchReq(pReq);
if (exec) {
streamTryExec(pTask);
......@@ -246,24 +246,20 @@ int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq,
return 0;
}
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
if (pRsp->inputStatus == TASK_INPUT_STATUS__NORMAL) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
streamProcessRunReq(pTask);
int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
streamProcessRunReq(pTask);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVERING;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 100) < 0) {
return -1;
}
} else {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->taskStatus = TASK_STATUS__NORMAL;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVER_SELF;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 100, true) < 0) {
return -1;
}
} else {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->taskStatus = TASK_STATUS__NORMAL;
}
return 0;
......
......@@ -93,7 +93,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
}
#endif
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
void* exec = pTask->exec.executor;
......@@ -125,24 +125,26 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
taosArrayDestroy(pRes);
break;
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
}
if (dispatch) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
qRes->childId = pTask->selfChildId;
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
qRes->childId = pTask->selfChildId;
if (streamTaskOutput(pTask, qRes) < 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return -1;
}
if (streamTaskOutput(pTask, qRes) < 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return -1;
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
}
}
......
......@@ -132,6 +132,49 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
return 0;
}
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq) {
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
return 0;
}
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp) {
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
int32_t sz = taosArrayGetSize(pRsp->checkpointVer);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo* pInfo = taosArrayGet(pRsp->checkpointVer, i);
if (tEncodeSStreamCheckpointInfo(pEncoder, pInfo) < 0) return -1;
}
return 0;
}
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp) {
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
pRsp->checkpointVer = taosArrayInit(sz, sizeof(SStreamCheckpointInfo));
if (pRsp->checkpointVer == NULL) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo info;
if (tDecodeSStreamCheckpointInfo(pDecoder, &info) < 0) return -1;
taosArrayPush(pRsp->checkpointVer, &info);
}
return 0;
}
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
......@@ -223,25 +266,129 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamFetchDownstreamStatus(SStreamTask* pTask) {
int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) {
int32_t taskId = pVgInfo->taskId;
int32_t nodeId = pVgInfo->vgId;
SStreamRecoverDownstreamReq req = {
.streamId = pTask->taskId,
.downstreamTaskId = taskId,
.taskId = pTask->taskId,
};
int32_t tlen;
int32_t code;
tEncodeSize(tEncodeSStreamTaskRecoverReq, &req, tlen, code);
if (code < 0) {
return -1;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if (tEncodeSStreamTaskRecoverReq(&encoder, &req) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
}
tEncoderClear(&encoder);
((SMsgHead*)buf)->vgId = htonl(nodeId);
SRpcMsg msg = {
.pCont = buf, .contLen = sizeof(SMsgHead) + tlen,
/*.msgType = */
};
tmsgSendReq(&pVgInfo->epSet, &msg);
return 0;
}
int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
// set self status to recover_phase1
// build fetch status msg
// send fetch msg
SStreamRecoverStatus* pRecover;
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
pRecover = taosHashGet(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t));
if (pRecover == NULL) {
pRecover = taosMemoryCalloc(1, sizeof(SStreamRecoverStatus));
if (pRecover == NULL) {
return -1;
}
pRecover->info = taosArrayInit(0, sizeof(void*));
if (pRecover->info == NULL) {
taosMemoryFree(pRecover);
return -1;
}
taosHashPut(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t), &pRecover, sizeof(void*));
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
pRecover->totReq = 1;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t numOfDownstream = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
pRecover->totReq = numOfDownstream;
for (int32_t i = 0; i < numOfDownstream; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pTask->shuffleDispatcher.dbInfo.pVgroupInfos, i);
streamFetchRecoverStatus(pTask, pVgInfo);
}
} else {
ASSERT(0);
}
return 0;
}
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) {
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
// if failed, set timer and retry
// if successful
// add rsp state to partial recover hash
// if complete, begin actual recover
int32_t taskId = pTask->taskId;
SStreamRecoverStatus* pRecover = taosHashGet(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
if (pRecover == NULL) {
return -1;
}
taosArrayPush(pRecover->info, &pRsp->checkpointVer);
int32_t leftRsp = atomic_sub_fetch_32(&pRecover->waitingRspCnt, 1);
ASSERT(leftRsp >= 0);
if (leftRsp == 0) {
ASSERT(taosArrayGetSize(pRecover->info) == pRecover->totReq);
// srcNodeId -> SStreamCheckpointInfo*
SHashObj* pFinalChecks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (pFinalChecks == NULL) return -1;
for (int32_t i = 0; i < pRecover->totReq; i++) {
SArray* pChecks = taosArrayGetP(pRecover->info, i);
int32_t sz = taosArrayGetSize(pChecks);
for (int32_t j = 0; j < sz; j++) {
SStreamCheckpointInfo* pOneCheck = taosArrayGet(pChecks, j);
SStreamCheckpointInfo* pCheck = taosHashGet(pFinalChecks, &pOneCheck->srcNodeId, sizeof(int32_t));
if (pCheck == NULL) {
pCheck = taosMemoryCalloc(1, sizeof(SStreamCheckpointInfo));
pCheck->srcNodeId = pOneCheck->srcNodeId;
pCheck->srcChildId = pOneCheck->srcChildId;
pCheck->stateProcessedVer = pOneCheck->stateProcessedVer;
taosHashPut(pFinalChecks, &pCheck->srcNodeId, sizeof(int32_t), &pCheck, sizeof(void*));
} else {
pCheck->stateProcessedVer = TMIN(pCheck->stateProcessedVer, pOneCheck->stateProcessedVer);
}
}
}
// load local state
//
// recover
//
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
if (streamPipelineExec(pTask, 10000, true) < 0) {
return -1;
}
}
taosHashCleanup(pFinalChecks);
taosHashRemove(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
}
return 0;
}
......
......@@ -128,7 +128,7 @@ typedef struct {
int8_t retryCnt;
int8_t retryLimit;
// bool setMaxRetry;
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
......@@ -195,17 +195,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define transLabel(trans) ((STrans*)trans)->label
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
//
// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transFreeMsg(void* msg);
//
typedef struct SConnBuffer {
char* buf;
......@@ -322,8 +312,8 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list
typedef struct STransReq {
queue q;
void* data;
queue q;
uv_write_t wreq;
} STransReq;
void transReqQueueInit(queue* q);
......
......@@ -16,7 +16,8 @@
#include "transComm.h"
typedef struct SConnList {
queue conn;
queue conn;
int32_t size;
} SConnList;
typedef struct SCliConn {
......@@ -339,8 +340,8 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
......@@ -510,7 +511,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
SHashObj* pPool = pool;
SConnList* plist = taosHashGet(pPool, key, strlen(key));
if (plist == NULL) {
SConnList list;
SConnList list = {0};
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pPool, key, strlen(key));
QUEUE_INIT(&plist->conn);
......@@ -519,15 +520,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
if (QUEUE_IS_EMPTY(&plist->conn)) {
return NULL;
}
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
return conn;
}
static void addConnToPool(void* pool, SCliConn* conn) {
......@@ -539,6 +543,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef(conn, true);
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(thrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
......@@ -556,13 +567,17 @@ static void addConnToPool(void* pool, SCliConn* conn) {
assert(conn->list != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conn, &conn->q);
conn->list->size += 1;
conn->task = NULL;
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) {
......@@ -1374,7 +1389,7 @@ int transReleaseCliHandle(void* handle) {
}
STransMsg tmsg = {.info.handle = handle};
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
// TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
......@@ -1415,7 +1430,6 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (ctx != NULL) {
pCtx->appCtx = *ctx;
}
assert(pTransInst->connType == TAOS_CONN_CLIENT);
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
......
......@@ -23,33 +23,6 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt;
static int32_t instMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
return ret;
}
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont);
......@@ -176,7 +149,6 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
* info--->|
*/
SConnBuffer* p = connBuf;
uvBuf->base = p->buf + p->len;
if (p->left == -1) {
uvBuf->len = p->cap - p->len;
......@@ -184,7 +156,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
if (p->left < p->cap - p->len) {
uvBuf->len = p->left;
} else {
p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
p->cap = p->left + p->len;
p->buf = taosMemoryRealloc(p->buf, p->cap);
uvBuf->base = p->buf + p->len;
uvBuf->len = p->left;
}
......@@ -266,14 +239,9 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
uv_async_t* async = &(pool->asyncs[idx]);
SAsyncItem* item = async->data;
int64_t st = taosGetTimestampUs();
taosThreadMutexLock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q);
taosThreadMutexUnlock(&item->mtx);
int64_t el = taosGetTimestampUs() - st;
if (el > 50) {
// tInfo("lock and unlock cost:%d", (int)el);
}
return uv_async_send(async);
}
......@@ -349,30 +317,21 @@ void transReqQueueInit(queue* q) {
QUEUE_INIT(q);
}
void* transReqQueuePush(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req;
req->data = wreq;
QUEUE_PUSH(q, &wreq->q);
return req;
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
req->wreq.data = req;
QUEUE_PUSH(q, &req->q);
return &req->wreq;
}
void* transReqQueueRemove(void* arg) {
void* ret = NULL;
uv_write_t* req = arg;
STransReq* wreq = req && req->data ? req->data : NULL;
assert(wreq->data == req);
if (wreq == NULL || wreq->data == NULL) {
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
return req;
}
uv_write_t* wreq = arg;
QUEUE_REMOVE(&wreq->q);
STransReq* req = wreq ? wreq->data : NULL;
if (req == NULL) return NULL;
QUEUE_REMOVE(&req->q);
ret = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
ret = wreq && wreq->handle ? wreq->handle->data : NULL;
taosMemoryFree(req);
return ret;
}
......@@ -381,7 +340,6 @@ void transReqQueueClear(queue* q) {
queue* h = QUEUE_HEAD(q);
QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req->data);
taosMemoryFree(req);
}
}
......
......@@ -75,7 +75,6 @@ typedef struct SWorkThrd {
SAsyncPool* asyncPool;
uv_prepare_t* prepare;
queue msg;
TdThreadMutex msgMtx;
queue conn;
void* pTransInst;
......@@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
......@@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->msg);
taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
......
......@@ -75,15 +75,14 @@ void processShellMsg() {
void *handle = pRpcMsg->info.handle;
taosFreeQitem(pRpcMsg);
{
SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize;
nRpcMsg.info.handle = handle;
nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
rpcSendResponse(&nRpcMsg);
}
//{
// SRpcMsg nRpcMsg = {0};
// nRpcMsg.pCont = rpcMallocCont(msgSize);
// nRpcMsg.contLen = msgSize;
// nRpcMsg.info.handle = handle;
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
// rpcSendResponse(&nRpcMsg);
//}
}
taosUpdateItemSize(qinfo.queue, numOfMsgs);
......
......@@ -44,11 +44,11 @@ typedef struct {
void (*fp)(void *);
} SRefSet;
static SRefSet tsRefSetList[TSDB_REF_OBJECTS];
static SRefSet tsRefSetList[TSDB_REF_OBJECTS];
static TdThreadOnce tsRefModuleInit = PTHREAD_ONCE_INIT;
static TdThreadMutex tsRefMutex;
static int32_t tsRefSetNum = 0;
static int32_t tsNextId = 0;
static int32_t tsRefSetNum = 0;
static int32_t tsNextId = 0;
static void taosInitRefModule(void);
static void taosLockList(int64_t *lockedBy);
......
......@@ -332,8 +332,8 @@
./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
# --- sync
#./test.sh -f tsim/sync/3Replica1VgElect.sim
#./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/3Replica1VgElect.sim
./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册