未验证 提交 6b04ee98 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16763 from taosdata/fix/handleSentWrite

fix(rpc): handle sent write
......@@ -416,20 +416,22 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
me.ctbEntry.pTags = pReq->ctb.pTag;
#ifdef TAG_FILTER_DEBUG
SArray* pTagVals = NULL;
int32_t code = tTagToValArray((STag*)pReq->ctb.pTag, &pTagVals);
SArray *pTagVals = NULL;
int32_t code = tTagToValArray((STag *)pReq->ctb.pTag, &pTagVals);
for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char* buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
memcpy(buf, pTagVal->pData, pTagVal->nData);
metaDebug("metaTag table:%s varchar index:%d cid:%d type:%d value:%s", pReq->name, i, pTagVal->cid, pTagVal->type, buf);
metaDebug("metaTag table:%s varchar index:%d cid:%d type:%d value:%s", pReq->name, i, pTagVal->cid,
pTagVal->type, buf);
taosMemoryFree(buf);
} else {
double val = 0;
GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
metaDebug("metaTag table:%s number index:%d cid:%d type:%d value:%f", pReq->name, i, pTagVal->cid, pTagVal->type, val);
metaDebug("metaTag table:%s number index:%d cid:%d type:%d value:%f", pReq->name, i, pTagVal->cid,
pTagVal->type, val);
}
}
#endif
......
......@@ -303,7 +303,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
len = (int32_t)strlen(INDEX_DATA_NULL_STR);
} else {
const char* emptyStr = " ";
static const char* emptyStr = " ";
buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
len = (int32_t)strlen(emptyStr);
}
......@@ -585,6 +585,12 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
idxTRsltDestroy(tr);
int ret = idxGenTFile(sIdx, pCache, result);
if (ret != 0) {
indexError("failed to merge");
} else {
int64_t cost = taosGetTimestampUs() - st;
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
}
idxDestroyFinalRslt(result);
idxCacheDestroyImm(pCache);
......@@ -595,12 +601,6 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
tfileReaderUnRef(pReader);
idxCacheUnRef(pCache);
int64_t cost = taosGetTimestampUs() - st;
if (ret != 0) {
indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
} else {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
}
atomic_store_32(&pCache->merging, 0);
if (quit) {
idxPost(sIdx);
......
......@@ -19,11 +19,12 @@
#include "tchecksum.h"
#include "tcoding.h"
static void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) {
static FORCE_INLINE void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr,
uint8_t nBytes) {
CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr;
idxFilePackUintIn(wrt, deltaAddr, nBytes);
}
static uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
static FORCE_INLINE uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) {
uint8_t nBytes = packDeltaSize(nodeAddr, transAddr);
fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes);
return nBytes;
......@@ -39,7 +40,7 @@ FstUnFinishedNodes* fstUnFinishedNodesCreate() {
fstUnFinishedNodesPushEmpty(nodes, false);
return nodes;
}
static void unFinishedNodeDestroyElem(void* elem) {
static void FORCE_INLINE unFinishedNodeDestroyElem(void* elem) {
FstBuilderNodeUnfinished* b = (FstBuilderNodeUnfinished*)elem;
fstBuilderNodeDestroy(b->node);
taosMemoryFree(b->last);
......
......@@ -30,14 +30,14 @@ typedef struct {
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
char* p = buf;
SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
SERIALIZE_VAR_TO_BUF(p, '_', char);
idxInt2str(blockId, p, 0);
return;
}
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
if (ctx->type == TFILE) {
int nwr = taosWriteFile(ctx->file.pFile, buf, len);
assert(nwr == len);
......@@ -47,7 +47,7 @@ static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
ctx->offset += len;
return len;
}
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
int nRead = 0;
if (ctx->type == TFILE) {
#ifdef USE_MMAP
......@@ -111,7 +111,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
} while (len > 0);
return total;
}
static int idxFileCtxGetSize(IFileCtx* ctx) {
static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) {
if (ctx->type == TFILE) {
int64_t file_size = 0;
taosStatFile(ctx->file.buf, &file_size, NULL);
......@@ -119,7 +119,7 @@ static int idxFileCtxGetSize(IFileCtx* ctx) {
}
return 0;
}
static int idxFileCtxDoFlush(IFileCtx* ctx) {
static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
if (ctx->type == TFILE) {
taosFsyncFile(ctx->file.pFile);
} else {
......
......@@ -16,7 +16,7 @@
#include "indexFstRegistry.h"
#include "os.h"
uint64_t fstRegistryHash(FstRegistry* registry, FstBuilderNode* bNode) {
static FORCE_INLINE uint64_t fstRegistryHash(FstRegistry* registry, FstBuilderNode* bNode) {
// TODO(yihaoDeng): refactor later
const uint64_t FNV_PRIME = 1099511628211;
uint64_t h = 14695981039346656037u;
......
......@@ -15,7 +15,7 @@
#include "indexFstSparse.h"
static void sparSetUtil(int32_t *buf, int32_t cap) {
static FORCE_INLINE void sparSetInitBuf(int32_t *buf, int32_t cap) {
for (int32_t i = 0; i < cap; i++) {
buf[i] = -1;
}
......@@ -28,8 +28,8 @@ FstSparseSet *sparSetCreate(int32_t sz) {
ss->dense = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t));
ss->sparse = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t));
sparSetUtil(ss->dense, sz);
sparSetUtil(ss->sparse, sz);
sparSetInitBuf(ss->dense, sz);
sparSetInitBuf(ss->sparse, sz);
ss->cap = sz;
......@@ -90,7 +90,7 @@ void sparSetClear(FstSparseSet *ss) {
if (ss == NULL) {
return;
}
sparSetUtil(ss->dense, ss->cap);
sparSetUtil(ss->sparse, ss->cap);
sparSetInitBuf(ss->dense, ss->cap);
sparSetInitBuf(ss->sparse, ss->cap);
ss->size = 0;
}
......@@ -1034,7 +1034,8 @@ static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int
sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version);
return;
}
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) {
static void FORCE_INLINE tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col,
int64_t version) {
char filename[128] = {0};
tfileGenFileName(filename, suid, col, version);
sprintf(fullname, "%s/%s", path, filename);
......
......@@ -21,7 +21,7 @@ typedef struct MergeIndex {
int len;
} MergeIndex;
static int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
static FORCE_INLINE int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
uint64_t v;
int32_t m;
while (s <= e) {
......
......@@ -80,6 +80,11 @@ IF(NOT TD_DARWIN)
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (idxTest
os
......@@ -102,11 +107,7 @@ IF(NOT TD_DARWIN)
gtest_main
index
)
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (idxTest
os
util
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
#include "index.h"
#include "indexCache.h"
#include "indexFst.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
#include "indexUtil.h"
#include "tskiplist.h"
#include "tutil.h"
using namespace std;
static std::string logDir = TD_TMP_DIR_PATH "log";
static void initLog() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
struct WriteBatch {
SIndexMultiTerm *terms;
};
class Idx {
public:
Idx(int _cacheSize = 1024 * 1024 * 4, const char *_path = "tindex") {
opts.cacheSize = _cacheSize;
path += TD_TMP_DIR_PATH;
path += _path;
}
int SetUp(bool remove) {
initLog();
if (remove) taosRemoveDir(path.c_str());
int ret = indexJsonOpen(&opts, path.c_str(), &index);
return ret;
}
int Write(WriteBatch *batch, uint64_t uid) {
// write batch
indexJsonPut(index, batch->terms, uid);
return 0;
}
int Read(const char *json, void *key, int64_t *id) {
// read batch
return 0;
}
void TearDown() { indexJsonClose(index); }
std::string path;
SIndexOpts opts;
SIndex *index;
};
SIndexTerm *indexTermCreateT(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName,
int32_t nColName, const char *colVal, int32_t nColVal) {
char buf[256] = {0};
int16_t sz = nColVal;
memcpy(buf, (uint16_t *)&sz, 2);
memcpy(buf + 2, colVal, nColVal);
if (colType == TSDB_DATA_TYPE_BINARY) {
return indexTermCreate(suid, oper, colType, colName, nColName, buf, sizeof(buf));
} else {
return indexTermCreate(suid, oper, colType, colName, nColName, colVal, nColVal);
}
return NULL;
}
int initWriteBatch(WriteBatch *wb, int batchSize) {
SIndexMultiTerm *terms = indexMultiTermCreate();
std::string colName;
std::string colVal;
for (int i = 0; i < 64; i++) {
colName += '0' + i;
colVal += '0' + i;
}
for (int i = 0; i < batchSize; i++) {
colVal[i % colVal.size()] = '0' + i % 128;
colName[i % colName.size()] = '0' + i % 128;
SIndexTerm *term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermAdd(terms, term);
}
wb->terms = terms;
return 0;
}
int BenchWrite(Idx *idx, int batchSize, int limit) {
for (int i = 0; i < limit; i += batchSize) {
WriteBatch wb;
idx->Write(&wb, i);
}
return 0;
}
int BenchRead(Idx *idx) { return 0; }
int main() {
// Idx *idx = new Idx;
// if (idx->SetUp(true) != 0) {
// std::cout << "failed to setup index" << std::endl;
// return 0;
// } else {
// std::cout << "succ to setup index" << std::endl;
// }
// BenchWrite(idx, 100, 10000);
return 1;
}
......@@ -271,20 +271,20 @@ void validateFst() {
}
delete m;
}
static std::string logDir = TD_TMP_DIR_PATH "log";
static void initLog() {
const char* defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
static std::string logDir = TD_TMP_DIR_PATH "log";
static void initLog() {
const char* defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
class IndexEnv : public ::testing::Test {
......
......@@ -52,7 +52,7 @@ void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM);
}
pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
}
......@@ -139,13 +139,15 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
}
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
if (NULL == nodeInfo) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
......@@ -314,7 +316,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (!schMgmt.cfg.enableReSchedule) {
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_BIND_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
}
......@@ -341,7 +343,8 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
}
if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId);
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes,
pTask->maxExecTimes, pTask->execId);
schHandleJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS;
}
......@@ -548,7 +551,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, pTask->maxRetryTimes);
SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes,
pTask->maxRetryTimes);
return TSDB_CODE_SUCCESS;
}
......@@ -564,25 +568,25 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS;
}
/*
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS;
}
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
/*
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS;
}
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS;
if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS;
}
}
}
*/
*/
*needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
......@@ -630,8 +634,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps,
SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId,
naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn,
SCH_GET_CUR_EP(naddr)->port);
++addNum;
}
......@@ -711,10 +716,10 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
if (candidateNum <= 1) {
goto _return;
}
switch (schMgmt.cfg.schPolicy) {
case SCH_LOAD_SEQ:
case SCH_ALL:
case SCH_ALL:
default:
if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0;
......@@ -732,7 +737,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
_return:
SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS;
}
......@@ -759,7 +764,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
return;
}
int32_t i = 0;
int32_t i = 0;
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) {
if (nodeInfo->handle) {
......@@ -821,16 +826,16 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
if (NULL == pJob) {
taosMemoryFree(param);
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
taosMemoryFree(param);
SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING);
}
SSchTask *pTask = pCtx->pTask;
int8_t status = 0;
int32_t code = 0;
int8_t status = 0;
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
......@@ -891,13 +896,12 @@ _return:
SCH_RET(code);
}
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
if (NULL == param) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
param->jobRid = pJob->refId;
param->pTask = pTask;
......@@ -906,7 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
} else {
SCH_ERR_RET(schLaunchTaskImpl(param));
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -162,7 +162,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
static void cliReleaseUnfinishedMsg(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* msg = transQueueGet(&conn->cliMsgs, i);
if (msg != NULL && msg->ctx != NULL) {
if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
conn->ctx.freeFunc(msg->ctx->ahandle);
}
......@@ -196,22 +196,22 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \
if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
break; \
} \
} \
if (i == sz) { \
pMsg = NULL; \
tDebug("msg not found, %" PRIu64 "", ahandle); \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
tDebug("msg found, %" PRIu64 "", ahandle); \
} \
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \
if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
break; \
} \
} \
if (i == sz) { \
pMsg = NULL; \
tDebug("msg not found, %" PRIu64 "", ahandle); \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
tDebug("msg found, %" PRIu64 "", ahandle); \
} \
} while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \
do { \
......@@ -289,7 +289,12 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(conn);
cliSend(conn);
if (pCliMsg == NULL)
return false;
else {
cliSend(conn);
return true;
}
}
return false;
_RETURN:
......@@ -376,8 +381,10 @@ void cliHandleResp(SCliConn* conn) {
return;
}
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
return;
if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
return;
}
}
destroyCmsg(pMsg);
......@@ -425,18 +432,20 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
transMsg.info.ahandle);
}
} else {
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
transMsg.info.ahandle = (pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
if (transMsg.info.ahandle == NULL) {
if (REQUEST_NO_RESP(&pMsg->msg)) destroyCmsg(pMsg);
if (REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) destroyCmsg(pMsg);
once = true;
continue;
}
}
if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
return;
if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
return;
}
}
destroyCmsg(pMsg);
tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
......@@ -702,6 +711,9 @@ static bool cliHandleNoResp(SCliConn* conn) {
if (cliMaySendCachedMsg(conn) == false) {
SCliThrd* thrd = conn->hostThrd;
addConnToPool(thrd->pool, conn);
res = false;
} else {
res = true;
}
}
}
......@@ -779,7 +791,13 @@ void cliSend(SCliConn* pConn) {
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) {
tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
uv_err_name(status));
cliHandleExcept(pConn);
}
return;
_RETURN:
return;
......@@ -928,7 +946,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
// persist conn already release by server
STransMsg resp;
cliBuildExceptResp(pMsg, &resp);
pTransInst->cfp(pTransInst->parent, &resp, NULL);
if (pMsg->type != Release) {
pTransInst->cfp(pTransInst->parent, &resp, NULL);
}
destroyCmsg(pMsg);
return;
}
......@@ -1399,53 +1419,57 @@ void transUnrefCliHandle(void* handle) {
cliDestroyConn((SCliConn*)handle, true);
}
}
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) {
SCliThrd* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) {
return NULL;
}
*validHandle = true;
if (exh->pThrd == NULL && trans != NULL) {
int idx = cliRBChoseIdx(trans);
if (idx < 0) return NULL;
exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
}
pThrd = exh->pThrd;
transReleaseExHandle(transGetRefMgt(), handle);
return pThrd;
}
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) {
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
if (handle == 0) {
int idx = cliRBChoseIdx(trans);
if (idx < 0) return NULL;
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
}
SCliThrd* pThrd = transGetWorkThrdFromHandle(handle, validHandle);
if (*validHandle == true && pThrd == NULL) {
int idx = cliRBChoseIdx(trans);
if (idx < 0) return NULL;
pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
}
SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
return pThrd;
}
int transReleaseCliHandle(void* handle) {
int idx = -1;
bool valid = false;
SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid);
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
if (pThrd == NULL) {
return -1;
}
STransMsg tmsg = {.info.handle = handle};
STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->ahandle = tmsg.info.ahandle;
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
cmsg->type = Release;
cmsg->ctx = pCtx;
STraceId* trace = &tmsg.info.traceId;
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
taosMemoryFree(cmsg);
destroyCmsg(cmsg);
return -1;
}
return 0;
......@@ -1458,9 +1482,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
return -1;
}
bool valid = false;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
if (pThrd == NULL && valid == false) {
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
......@@ -1503,9 +1526,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
return -1;
}
bool valid = false;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
if (pThrd == NULL && valid == false) {
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
......@@ -1589,6 +1611,7 @@ int64_t transAllocHandle() {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->refId = transAddExHandle(transGetRefMgt(), exh);
tDebug("pre alloc refId %" PRId64 "", exh->refId);
return exh->refId;
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册