提交 f86ab16b 编写于 作者: H Haojun Liao

[td-255] refactor for experiment purpose.

上级 5e5fe11f
......@@ -92,7 +92,7 @@ typedef struct SMergeTsCtx {
}SMergeTsCtx;
typedef struct SVgroupTableInfo {
SVgroupInfo vgInfo;
SVgroupMsg vgInfo;
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
......@@ -288,7 +288,11 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
#if 0
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
#endif
/**
* The create object function must be successful expect for the out of memory issue.
*
......
......@@ -8685,7 +8685,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
if (p->vgroupIdList != NULL) {
size_t s = taosArrayGetSize(p->vgroupIdList);
size_t vgroupsz = sizeof(SVgroupInfo) * s + sizeof(SVgroupsInfo);
size_t vgroupsz = sizeof(SVgroupMsg) * s + sizeof(SVgroupsInfo);
pTableMetaInfo->vgroupList = calloc(1, vgroupsz);
if (pTableMetaInfo->vgroupList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -8700,14 +8700,14 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
taosHashGetClone(tscVgroupMap, id, sizeof(*id), NULL, &existVgroupInfo);
assert(existVgroupInfo.inUse >= 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j];
pVgroup->numOfEps = existVgroupInfo.numOfEps;
pVgroup->vgId = existVgroupInfo.vgId;
for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) {
pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port;
pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN);
}
SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[j];
memcpy(pVgroup, &existVgroupInfo, sizeof(SVgroupMsg));
// pVgroup->numOfEps = existVgroupInfo.numOfEps;
// pVgroup->vgId = existVgroupInfo.vgId;
// for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) {
// pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port;
// pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN);
// }
}
}
}
......
......@@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) {
return ret;
}
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) {
assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
// Issue the query to one of the vnode among a vgroup randomly.
......@@ -93,6 +93,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
existed = true;
}
}
assert(existed);
}
......@@ -723,7 +724,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab
int32_t index = pTableMetaInfo->vgroupIndex;
assert(index >= 0);
SVgroupInfo* pVgroupInfo = NULL;
SVgroupMsg* pVgroupInfo = NULL;
if (pTableMetaInfo->vgroupList && pTableMetaInfo->vgroupList->numOfVgroups > 0) {
assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
......@@ -880,6 +881,10 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = NULL;
STableMeta *pTableMeta = NULL;
STableMetaInfo *pTableMetaInfo = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t size = tscEstimateQueryMsgSize(pSql);
......@@ -888,9 +893,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_OPERATION; // todo add test for this
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
pQueryInfo = tscGetQueryInfo(pCmd);
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pTableMeta = pTableMetaInfo->pTableMeta;
SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
......@@ -2146,7 +2151,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
*size = (int32_t)(sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg));
size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
size_t vgroupsz = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
SVgroupsInfo *pVgroupInfo = calloc(1, vgroupsz);
assert(pVgroupInfo != NULL);
......@@ -2156,7 +2161,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
} else {
for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) {
// just init, no need to lock
SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j];
SVgroupMsg *pVgroup = &pVgroupInfo->vgroups[j];
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
vmsg->vgId = htonl(vmsg->vgId);
......@@ -2168,7 +2173,8 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
pVgroup->vgId = vmsg->vgId;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
pVgroup->epAddr[k].port = vmsg->epAddr[k].port;
pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
tstrncpy(pVgroup->epAddr[k].fqdn, vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
// pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
}
doUpdateVgroupInfo(pVgroup->vgId, vmsg);
......
......@@ -588,8 +588,8 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]);
tscBuildAndSendRequest(pSql, NULL);
return false;
// tscBuildAndSendRequest(pSql, NULL);
// return false;
}
return true;
......
......@@ -623,13 +623,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
// set the tag column id for executor to extract correct tag value
#ifndef _TD_NINGSI_60
pExpr->base.param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
#else
pExpr->base.param[0].i64 = colId;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT;
pExpr->base.param[0].nLen = sizeof(int64_t);
#endif
tVariant* pVariant = &pExpr->base.param[0];
pVariant->i64 = colId;
pVariant->nType = TSDB_DATA_TYPE_BIGINT;
pVariant->nLen = sizeof(int64_t);
pExpr->base.numOfParams = 1;
}
......@@ -748,10 +747,12 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo info = {{0}};
for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (tt->vgId == pvg->vgroups[m].vgId) {
tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
memcpy(&info.vgInfo, &pvg->vgroups[m], sizeof(info.vgInfo));
// tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
break;
}
}
assert(info.vgInfo.numOfEps != 0);
vgTables = taosArrayInit(4, sizeof(STableIdInfo));
......@@ -2459,7 +2460,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
tfree(p);
}
static void doConcurrentlySendSubQueries(SSqlObj* pSql) {
static UNUSED_FUNC void doConcurrentlySendSubQueries(SSqlObj* pSql) {
SSubqueryState *pState = &pSql->subState;
// concurrently sent the query requests.
......@@ -2550,13 +2551,14 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->pExtMemBuffer = pMemoryBuf;
trs->pOrderDescriptor = pDesc;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
trs->localBuffer = (tFilePage *)malloc(nBufferSize + sizeof(tFilePage));
if (trs->localBuffer == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs);
break;
}
trs->localBuffer->num = 0;
trs->subqueryIndex = i;
trs->pParentSql = pSql;
......@@ -2577,6 +2579,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
trs->subqueryIndex);
tfree(trs->localBuffer);
tfree(trs);
}
if (i < pState->numOfSub) {
......@@ -2594,7 +2599,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return pRes->code;
}
doConcurrentlySendSubQueries(pSql);
pSql->fp(pSql->param, pSql, 0);
// doConcurrentlySendSubQueries(pSql);
return TSDB_CODE_SUCCESS;
}
......@@ -2651,7 +2657,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
int32_t subqueryIndex = trsupport->subqueryIndex;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
......@@ -2929,7 +2935,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSubqueryState* pState = &pParentSql->subState;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SVgroupMsg *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
......@@ -3057,7 +3063,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert(pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
// stable query killed or other subquery failed, all query stopped
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -3403,7 +3409,6 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
return;
}
// tscRestoreFuncForSTableQuery(pQueryInfo);
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
assert(numOfRes * rowSize > 0);
......
......@@ -3362,11 +3362,11 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
#if 0
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
#endif
taosArrayDestroy(pInfo->itemList);
}
......@@ -3380,9 +3380,9 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
assert(size > index);
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
// for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
// tfree(pInfo->vgInfo.epAddr[j].fqdn);
// }
taosArrayDestroy(pInfo->itemList);
taosArrayRemove(pVgroupTable, index);
......@@ -3392,9 +3392,12 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo;
#if 0
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
#endif
if (pInfo->itemList) {
info->itemList = taosArrayDup(pInfo->itemList);
......@@ -3615,7 +3618,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd;
......@@ -3749,7 +3751,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pTableMeta, pTableMetaInfo->vgroupList,
pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables);
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, 0);
if (pPrevInfo->pTableMeta && pPrevInfo->pTableMeta->tableType < 0) {
......@@ -3759,8 +3760,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
STableMeta* pPrevTableMeta = tscTableMetaDup(pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList,
pTableMetaInfo->pVgroupTables);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pPrevTableMeta, pVgroupsInfo,
pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables);
}
// this case cannot be happened
......@@ -4404,7 +4405,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
return NULL;
}
size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupList->numOfVgroups;
size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups;
SVgroupsInfo* pNew = calloc(1, size);
if (pNew == NULL) {
return NULL;
......@@ -4413,15 +4414,16 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
pNew->numOfVgroups = vgroupList->numOfVgroups;
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SVgroupMsg* pNewVInfo = &pNew->vgroups[i];
SVgroupInfo* pvInfo = &vgroupList->vgroups[i];
SVgroupMsg* pvInfo = &vgroupList->vgroups[i];
pNewVInfo->vgId = pvInfo->vgId;
pNewVInfo->numOfEps = pvInfo->numOfEps;
for(int32_t j = 0; j < pvInfo->numOfEps; ++j) {
pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn);
// pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn);
pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port;
tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN);
}
}
......@@ -4433,8 +4435,9 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL;
}
#if 0
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
SVgroupMsg* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
tfree(pVgroupInfo->epAddr[j].fqdn);
......@@ -4445,10 +4448,11 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
}
}
#endif
tfree(vgroupList);
return NULL;
}
# if 0
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
......@@ -4461,6 +4465,8 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
}
}
#endif
char* serializeTagData(STagData* pTagData, char* pMsg) {
int32_t n = (int32_t) strlen(pTagData->name);
*(int32_t*) pMsg = htonl(n);
......@@ -4601,11 +4607,12 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) {
assert(pVgroupsInfo != NULL);
size_t size = sizeof(SVgroupInfo) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo);
size_t size = sizeof(SVgroupMsg) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo);
SVgroupsInfo* pInfo = calloc(1, size);
pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups;
for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) {
tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]);
memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg));
// tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]);
}
return pInfo;
}
......
......@@ -766,11 +766,11 @@ typedef struct SSTableVgroupMsg {
int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct {
int32_t vgId;
int8_t numOfEps;
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
//typedef struct {
// int32_t vgId;
// int8_t numOfEps;
// SEpAddr1 epAddr[TSDB_MAX_REPLICA];
//} SVgroupInfo;
typedef struct {
int32_t vgId;
......@@ -780,7 +780,7 @@ typedef struct {
typedef struct {
int32_t numOfVgroups;
SVgroupInfo vgroups[];
SVgroupMsg vgroups[];
} SVgroupsInfo;
typedef struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册