未验证 提交 758bd5dc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2405 from taosdata/feature/query

Feature/query
......@@ -89,27 +89,21 @@ typedef struct SVgroupTableInfo {
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset);
SDataBlockList* tscCreateBlockArrayList();
void* tscDestroyBlockArrayList(SDataBlockList* pList);
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
void tscFreeUnusedDataBlocks(SArray* pDataBlockList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks);
//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
*
* for the projection query on metric or point interpolation query on metric,
* we iterate all the meters, instead of invoke query on all qualified meters simultaneously.
*
......
......@@ -183,11 +183,11 @@ typedef struct STableDataBlocks {
SParamInfo *params;
} STableDataBlocks;
typedef struct SDataBlockList { // todo remove
uint32_t nSize;
uint32_t nAlloc;
STableDataBlocks **pData;
} SDataBlockList;
//typedef struct SDataBlockList { // todo remove
// uint32_t nSize;
// uint32_t nAlloc;
// STableDataBlocks **pData;
//} SDataBlockList;
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
......@@ -238,8 +238,7 @@ typedef struct {
void * pTableList; // referred table involved in sql
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
} SSqlCmd;
typedef struct SResRec {
......
......@@ -1040,8 +1040,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
......@@ -1174,7 +1173,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _error;
}
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
taosArrayPush(pCmd->pDataBlocks, &pDataBlock);
strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
......@@ -1262,7 +1261,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean;
}
if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1372,8 +1371,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
// the pDataBlock is different from the pTableDataBlocks
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1404,15 +1402,15 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t rowSize = tinfo.rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList();
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk),
pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
return ret;
}
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);
taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows);
if (TSDB_CODE_SUCCESS != code) return -1;
......@@ -1446,7 +1444,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
return -code;
}
pTableDataBlock = pCmd->pDataBlocks->pData[0];
pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
pTableDataBlock->size = sizeof(SSubmitBlk);
pTableDataBlock->rowSize = tinfo.rowSize;
......@@ -1483,13 +1481,14 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
int32_t affected_rows = 0;
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL);
SDataBlockList *pDataBlockList = pCmd->pDataBlocks;
SArray *pDataBlockList = pCmd->pDataBlocks;
pCmd->pDataBlocks = NULL;
char path[PATH_MAX] = {0};
for (int32_t i = 0; i < pDataBlockList->nSize; ++i) {
pDataBlock = pDataBlockList->pData[i];
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; ++i) {
pDataBlock = taosArrayGetP(pDataBlockList, i );
if (pDataBlock == NULL) {
continue;
}
......
......@@ -331,8 +331,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
binded = pCmd->batchSize / 2;
}
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
size_t size = taosArrayGetSize(pCmd->pDataBlocks);
for (int32_t i = 0; i < size; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
uint32_t dataSize = totalDataSize / alloced;
assert(dataSize * alloced == totalDataSize);
......@@ -370,8 +371,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
size_t total = taosArrayGetSize(pCmd->pDataBlocks);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size += totalDataSize / alloced;
......@@ -395,8 +397,10 @@ static int insertStmtReset(STscStmt* pStmt) {
SSqlCmd* pCmd = &pStmt->pSql->cmd;
if (pCmd->batchSize > 2) {
int32_t alloced = (pCmd->batchSize + 1) / 2;
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
size_t size = taosArrayGetSize(pCmd->pDataBlocks);
for (int32_t i = 0; i < size; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced;
......@@ -423,15 +427,15 @@ static int insertStmtExecute(STscStmt* stmt) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
if (pCmd->pDataBlocks->nSize > 0) {
if (taosArrayGetSize(pCmd->pDataBlocks) > 0) {
// merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
code = tscCopyDataBlockToPayload(stmt->pSql, pDataBlock);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
......@@ -4463,7 +4463,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid);
......
......@@ -176,18 +176,16 @@ int tscSendMsgToServer(SSqlObj *pSql) {
char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (pSql->cmd.command < TSDB_SQL_MGMT) {
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
} else {
// set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
pSql->ipList = tscMgmtIpSet;
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
}
// tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
......@@ -222,8 +220,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command,
pObj, pObj->signature);
tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
tscFreeSqlObj(pSql);
rpcFreeCont(rpcMsg->pCont);
......@@ -449,18 +447,11 @@ void tscKillSTableQuery(SSqlObj *pSql) {
}
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pMsg += sizeof(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pRetrieveMsg->free = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type);
// todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -474,12 +465,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
}
pMsg += sizeof(SRetrieveTableMsg);
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
return TSDB_CODE_SUCCESS;
}
......@@ -487,30 +478,30 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
char* pMsg = pSql->cmd.payload;
// NOTE: shell message size should not include SMsgDesc
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
int32_t vgId = pTableMeta->vgroupInfo.vgId;
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
pMsgDesc->numOfVnodes = htonl(1); //todo set the right number of vnodes
pMsgDesc->numOfVnodes = htonl(1); // always one vnode
pMsg += sizeof(SMsgDesc);
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
int32_t vgId = pTableMeta->vgroupInfo.vgId;
pShellMsg->header.vgId = htonl(vgId);
pShellMsg->header.contLen = htonl(size);
pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc
pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
tscTrace("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->ipList.numOfIps);
return TSDB_CODE_SUCCESS;
}
......@@ -620,9 +611,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1;
}
char *pStart = pCmd->payload + tsRpcHeadSize;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
......@@ -821,7 +810,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
// compressed ts block
pQueryMsg->tsOffset = htonl(pMsg - pStart);
pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload);
int32_t tsLen = 0;
int32_t numOfBlocks = 0;
......@@ -844,7 +833,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
}
int32_t msgLen = pMsg - pStart;
int32_t msgLen = pMsg - pCmd->payload;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen;
......@@ -1286,10 +1275,12 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize);
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);
return TSDB_CODE_SUCCESS;
......@@ -1552,150 +1543,6 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
//}
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
#if 0
SSuperTableMetaMsg *pMetaMsg;
char * pMsg, *pStart;
int msgLen = 0;
int tableIndex = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STagCond *pTagCond = &pQueryInfo->tagCond;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for metric meter msg", pSql);
return -1;
}
pStart = pCmd->payload + tsRpcHeadSize;
pMsg = pStart;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db);
pMsg += sizeof(SMgmtHead);
pMetaMsg = (SSuperTableMetaMsg *)pMsg;
pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
pMsg += sizeof(SSuperTableMetaMsg);
int32_t offset = pMsg - (char *)pMetaMsg;
pMetaMsg->join = htonl(offset);
// todo refactor
pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
pMsg += sizeof(int16_t);
memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
pMsg += sizeof(int16_t);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
offset = pMsg - (char *)pMetaMsg;
pMetaMsg->metaElem[i] = htonl(offset);
SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
pMsg += sizeof(SSuperTableMetaElemMsg);
// convert to unicode before sending to mnode for metric query
int32_t condLen = 0;
if (pTagCond->numOfTagCond > 0) {
SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
if (pCond != NULL && pCond->cond != NULL) {
condLen = strlen(pCond->cond) + 1;
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
if (!ret) {
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
return 0;
}
}
}
pElem->condLen = htonl(condLen);
offset = pMsg - (char *)pMetaMsg;
pElem->cond = htonl(offset);
pMsg += condLen * TSDB_NCHAR_SIZE;
pElem->rel = htons(pTagCond->relType);
if (pTagCond->tbnameCond.uid == uid) {
offset = pMsg - (char *)pMetaMsg;
pElem->tableCond = htonl(offset);
uint32_t len = 0;
if (pTagCond->tbnameCond.cond != NULL) {
len = strlen(pTagCond->tbnameCond.cond);
memcpy(pMsg, pTagCond->tbnameCond.cond, len);
}
pElem->tableCondLen = htonl(len);
pMsg += len;
}
SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
pElem->orderType = 0;
pElem->orderIndex = 0;
pElem->numOfGroupCols = 0;
} else {
pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
}
if (pGroupby->numOfGroupCols != 0) {
pElem->orderIndex = htons(pGroupby->orderIndex);
pElem->orderType = htons(pGroupby->orderType);
offset = pMsg - (char *)pMetaMsg;
pElem->groupbyTagColumnList = htonl(offset);
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
SColIndex *pDestCol = (SColIndex *)pMsg;
pDestCol->colIdxInBuf = 0;
pDestCol->colIndex = htons(pCol->colIndex);
pDestCol->colId = htons(pDestCol->colId);
pDestCol->flag = htons(pDestCol->flag);
strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
pMsg += sizeof(SColIndex);
}
}
}
strcpy(pElem->tableId, pTableMetaInfo->name);
pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
int16_t len = pMsg - (char *)pElem;
pElem->elemLen = htons(len); // redundant data for integrate check
}
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
assert(msgLen + minMsgSize() <= size);
#endif
SSqlCmd *pCmd = &pSql->cmd;
char* pMsg = pCmd->payload;
......@@ -1795,7 +1642,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(i == 0);
}
assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
pSchema++;
}
......
......@@ -133,8 +133,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL;
}
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql;
}
......
......@@ -180,6 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
getTmpfilePath("join-", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
// todo handle error
if (pSupporter->f == NULL) {
tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
}
......@@ -234,7 +235,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL;
......@@ -249,7 +250,7 @@ static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = pSupporter->pState;
......@@ -451,7 +452,7 @@ static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupport
freeJoinSubqueryObj(pParentSql);
} else {
updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
tscLaunchRealSubqueries(pParentSql);
}
}
......@@ -851,7 +852,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
tscLaunchRealSubqueries(pParentSql);
}
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
......@@ -1159,7 +1160,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -1302,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
pState->numOfTotal = pQueryInfo->numOfTables;
pState->numOfRemain = pState->numOfTotal;
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables);
tscTrace("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
......@@ -1848,8 +1848,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
// record the total inserted rows
......@@ -1864,7 +1862,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->res.code = pSql->res.code;
}
// it is not the initial sqlObj, free it
taos_free_result(tres);
tfree(pSupporter);
......@@ -1876,7 +1873,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// release data block data
tfree(pState);
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
......@@ -1889,12 +1886,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0);
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
size_t size = taosArrayGetSize(pCmd->pDataBlocks);
assert(size > 0);
pSql->pSubs = calloc(size, POINTER_BYTES);
pSql->numOfSubs = size;
tscTrace("%p submit data to %zu vnode(s)", pSql, size);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pSql->numOfSubs;
......@@ -1920,12 +1919,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pNew->fetchFp = pNew->fp;
pSql->pSubs[numOfSub] = pNew;
pRes->code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub++]);
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
numOfSub++;
} else {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub,
pDataBlocks->nSize, tstrerror(pRes->code));
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%zu, code:%s", pSql, numOfSub,
size, tstrerror(pRes->code));
goto _error;
}
}
......@@ -1942,7 +1943,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
return TSDB_CODE_SUCCESS;
_error:
......
......@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscUtil.h"
#include "hash.h"
#include "os.h"
#include "qast.h"
#include "taosmsg.h"
......@@ -29,6 +27,8 @@
#include "ttimer.h"
#include "ttokendef.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "hash.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
......@@ -428,48 +428,18 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return param;
}
SDataBlockList* tscCreateBlockArrayList() {
const int32_t DEFAULT_INITIAL_NUM_OF_BLOCK = 16;
SDataBlockList* pDataBlockArrayList = calloc(1, sizeof(SDataBlockList));
if (pDataBlockArrayList == NULL) {
void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
if (pDataBlockList == NULL) {
return NULL;
}
pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK;
pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc);
if (pDataBlockArrayList->pData == NULL) {
free(pDataBlockArrayList);
return NULL;
}
return pDataBlockArrayList;
}
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) {
if (pList->nSize >= pList->nAlloc) {
pList->nAlloc = (pList->nAlloc) << 1U;
pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc);
// reset allocated memory
memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize));
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i);
tscDestroyDataBlock(d);
}
pList->pData[pList->nSize++] = pBlocks;
}
void* tscDestroyBlockArrayList(SDataBlockList* pList) {
if (pList == NULL) {
return NULL;
}
for (int32_t i = 0; i < pList->nSize; i++) {
tscDestroyDataBlock(pList->pData[i]);
}
tfree(pList->pData);
tfree(pList);
taosArrayDestroy(pDataBlockList);
return NULL;
}
......@@ -484,7 +454,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
strcpy(pTableMetaInfo->name, pDataBlock->tableId);
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta);
......@@ -497,31 +467,32 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
* additional space.
*/
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100);
int ret = tscAllocPayload(pCmd, pDataBlock->size + 100);
if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize);
assert(pDataBlock->size <= pDataBlock->nAllocSize);
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size);
/*
* the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size
*/
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
pCmd->payloadLen = pDataBlock->size;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0);
assert(pCmd->allocSize >= pCmd->payloadLen + 100 && pCmd->payloadLen > 0);
return TSDB_CODE_SUCCESS;
}
void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
/* release additional memory consumption */
for (int32_t i = 0; i < pList->nSize; ++i) {
STableDataBlocks* pDataBlock = pList->pData[i];
pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
}
}
//void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
// /* release additional memory consumption */
// for (int32_t i = 0; i < pList->nSize; ++i) {
// STableDataBlocks* pDataBlock = pList->pData[i];
// pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
// pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
// }
//}
/**
* create the in-memory buffer for each table to keep the submitted data block
......@@ -568,7 +539,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_SUCCESS;
}
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks) {
*dataBlocks = NULL;
......@@ -585,7 +556,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
}
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
tscAppendDataBlock(pDataBlockList, *dataBlocks);
taosArrayPush(pDataBlockList, dataBlocks);
}
return TSDB_CODE_SUCCESS;
......@@ -634,14 +605,15 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
return len;
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL;
......@@ -679,10 +651,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e));
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
......@@ -704,7 +676,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
// free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList;
tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
// tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_SUCCESS;
......
......@@ -244,7 +244,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100)
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
......
......@@ -105,8 +105,9 @@ typedef struct {
void tsdbClearTableCfg(STableCfg *config);
void * tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes);
char * tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id);
void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(void *pTable);
STableId tsdbGetTableId(void *pTable);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
......@@ -176,18 +177,16 @@ typedef struct SQueryRowCond {
TSKEY ts;
} SQueryRowCond;
typedef void *TsdbPosT;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param tableqinfoGroupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
......@@ -197,12 +196,17 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each
* block
* @param groupInfo tableId list.
* @param tableqinfoGroupInfo tableId list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
SArray *tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
/**
* get the queried table object list
* @param pHandle
* @return
*/
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo);
......@@ -247,37 +251,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta
*/
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);
/**
* todo remove this function later
* @param pQueryHandle
* @param pIdList
* @return
*/
SArray *tsdbRetrieveDataRow(TsdbQueryHandleT *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond);
/**
* Get iterator for super tables, of which tags values satisfy the tag filter info
*
* NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7))
* The filter string is sent from client directly.
* The build of the tags filter expression from string is done in the iterator generating function.
*
* @param pCond query condition
* @param pTagFilterStr tag filter info
* @return
*/
TsdbQueryHandleT *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr);
/**
* Get the qualified tables for (super) table query.
* Used to handle the super table projection queries, the last_row query, the group by on normal columns query,
* the interpolation query, and timestamp-comp query for join processing.
*
* @param pQueryHandle
* @return table sid list. the invoker is responsible for the release of this the sid list.
*/
SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
/**
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
......@@ -287,6 +260,12 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pT
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
/**
* destory the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestoryTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
......
......@@ -102,7 +102,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
int64_t tag;
STimeWindow win;
STSCursor cur;
STableId id; // for retrieve the page id list
void* pTable; // for retrieve the page id list
SWindowResInfo windowResInfo;
} STableQueryInfo;
......@@ -126,10 +126,10 @@ typedef struct SQueryCostInfo {
uint64_t computTime;
} SQueryCostInfo;
typedef struct SGroupItem {
STableId id;
STableQueryInfo* info;
} SGroupItem;
//typedef struct SGroupItem {
// void *pTable;
// STableQueryInfo *info;
//} SGroupItem;
typedef struct SQuery {
int16_t numOfCols;
......@@ -187,8 +187,8 @@ typedef struct SQInfo {
void* tsdb;
int32_t vgId;
STableGroupInfo tableIdGroupInfo; // table id list < only includes the STableId list>
STableGroupInfo groupInfo; //
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv;
int32_t groupIndex;
int32_t offset; // offset in group result set of subgroup, todo refactor
......
此差异已折叠。
......@@ -114,8 +114,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) { // remove the window slot from hash table
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type);
printf("remove ============>%ld, remain size:%ld\n", pResult->window.skey, pWindowResInfo->hashList->size);
} else {
break;
}
......
......@@ -72,7 +72,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
if (ret != 0) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
return -TSDB_CODE_QRY_NO_DISKSPACE;
return TSDB_CODE_QRY_NO_DISKSPACE;
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE;
......@@ -80,7 +80,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return -TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
......
......@@ -158,18 +158,16 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
return 0;
}
void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes) {
void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) {
// TODO: this function should be changed also
STsdbMeta *pMeta = tsdbGetMeta(repo);
STable * pTable = tsdbGetTableByUid(pMeta, id->uid);
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
STSchema *pSchema = tsdbGetTableTagSchema((STable*) pTable);
STColumn *pCol = tdGetColOfID(pSchema, colId);
if (pCol == NULL) {
return NULL; // No matched tag volumn
}
char *val = tdGetKVRowValOfCol(pTable->tagVal, colId);
char *val = tdGetKVRowValOfCol(((STable*)pTable)->tagVal, colId);
assert(type == pCol->type && bytes == pCol->bytes);
if (val != NULL && IS_VAR_DATA_TYPE(type)) {
......@@ -179,20 +177,21 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i
return val;
}
char *tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id) {
char *tsdbGetTableName(void* pTable) {
// TODO: need to change as thread-safe
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STable *pTable = tsdbGetTableByUid(pMeta, id->uid);
if (pTable == NULL) {
return NULL;
} else {
return (char *)pTable->name;
return (char*) (((STable *)pTable)->name);
}
}
STableId tsdbGetTableId(void *pTable) {
assert(pTable);
return ((STable*)pTable)->tableId;
}
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (pMsg == NULL) return NULL;
......
......@@ -184,15 +184,17 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STableId* id = (STableId*) taosArrayGet(group, j);
STable* pTable = (STable*) taosArrayGetP(group, j);
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = *id,
.pTableObj = tsdbGetTableByUid(pMeta, id->uid),
.tableId = pTable->tableId,
.pTableObj = pTable,
};
assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid);
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE));
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
}
}
......@@ -215,17 +217,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
return pQueryHandle;
}
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
assert(pHandle != NULL);
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, sizeof(STableId));
SArray* res = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayPush(res, &pCheckInfo->tableId);
taosArrayPush(res, &pCheckInfo->pTableObj);
}
return res;
......@@ -1052,7 +1054,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext(pCheckInfo->iter);
moveToNextRow(pCheckInfo);
}
int32_t start = -1;
......@@ -1607,6 +1609,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
}
if (index == -1) {
// todo add failure test cases
return;
}
......@@ -1856,42 +1859,19 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
TsdbQueryHandleT* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) {
return NULL;
}
SArray* tsdbGetTableList(TsdbQueryHandleT* pQueryHandle) { return NULL; }
static int32_t getAllTableIdList(STable* pSuperTable, SArray* list) {
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
taosArrayPush(list, &(*pTable)->tableId);
taosArrayPush(list, pTable);
}
tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS;
}
/**
* convert the result pointer to table id instead of table object pointer
* todo remove it by using callback function to change the final result in-time.
* @param pRes
*/
static void convertQueryResult(SArray* pRes, SArray* pTableList) {
if (pTableList == NULL || taosArrayGetSize(pTableList) == 0) {
return;
}
size_t size = taosArrayGetSize(pTableList);
for (int32_t i = 0; i < size; ++i) { // todo speedup by using reserve space.
STable* pTable = taosArrayGetP(pTableList, i);
taosArrayPush(pRes, &pTable->tableId);
}
}
static void destroyHelper(void* param) {
if (param == NULL) {
return;
......@@ -1960,16 +1940,13 @@ typedef struct STableGroupSupporter {
int32_t numOfCols;
SColIndex* pCols;
STSchema* pTagSchema;
void* tsdbMeta;
// void* tsdbMeta;
} STableGroupSupporter;
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STableId* id1 = (STableId*) p1;
STableId* id2 = (STableId*) p2;
STable *pTable1 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id1->uid);
STable *pTable2 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id2->uid);
STable* pTable1 = *(STable**) p1;
STable* pTable2 = *(STable**) p2;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
......@@ -2019,26 +1996,29 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
return 0;
}
void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTables, STableGroupSupporter* pSupp,
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
__ext_compar_fn_t compareFn) {
STableId* pId = taosArrayGet(pTableIdList, 0);
SArray* g = taosArrayInit(16, sizeof(STableId));
taosArrayPush(g, pId);
STable* pTable = taosArrayGetP(pTableList, 0);
SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTable);
tsdbRefTable(pTable);
for (int32_t i = 1; i < numOfTables; ++i) {
STableId* prev = taosArrayGet(pTableIdList, i - 1);
STableId* p = taosArrayGet(pTableIdList, i);
STable** prev = taosArrayGet(pTableList, i - 1);
STable** p = taosArrayGet(pTableList, i);
int32_t ret = compareFn(prev, p, pSupp);
assert(ret == 0 || ret == -1);
tsdbRefTable(*p);
assert((*p)->type == TSDB_CHILD_TABLE);
if (ret == 0) {
taosArrayPush(g, p);
} else {
taosArrayPush(pGroups, &g); // current group is ended, start a new group
g = taosArrayInit(16, sizeof(STableId));
g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, p);
}
}
......@@ -2046,8 +2026,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab
taosArrayPush(pGroups, &g);
}
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
TSDB_REPO_T* tsdb) {
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
......@@ -2058,22 +2037,24 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, sizeof(STableId));
SArray* sa = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) {
STableId* tableId = taosArrayGet(pTableList, i);
taosArrayPush(sa, tableId);
STable** pTable = taosArrayGet(pTableList, i);
assert((*pTable)->type == TSDB_CHILD_TABLE);
tsdbRefTable(*pTable);
taosArrayPush(sa, pTable);
}
taosArrayPush(pTableGroup, &sa);
tsdbTrace("all %zu tables belong to one group", size);
} else {
STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
pSupp->tsdbMeta = tsdbGetMeta(tsdb);
pSupp->numOfCols = numOfOrderCols;
pSupp->pTagSchema = pTagSchema;
pSupp->pCols = pCols;
taosqsort(pTableList->pData, size, sizeof(STableId), pSupp, tableGroupComparFn);
taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
tfree(pSupp);
}
......@@ -2149,48 +2130,53 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
.pExtInfo = pSTable->tagSchema,
};
SArray* pTableList = taosArrayInit(8, POINTER_BYTES);
tExprTreeTraverse(pExpr, pSTable->pIndex, pTableList, &supp);
tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
tExprTreeDestroy(&pExpr, destroyHelper);
convertQueryResult(pRes, pTableList);
taosArrayDestroy(pTableList);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
return TSDB_CODE_TDB_INVALID_TABLE_ID;
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
tsdbUnlockRepoMeta(tsdb);
goto _error;
}
if (pTable->type != TSDB_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
pTable->name->data);
return TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
tsdbUnlockRepoMeta(tsdb);
goto _error;
}
SArray* res = taosArrayInit(8, sizeof(STableId));
//NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, POINTER_BYTES);
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableIdList(pTable, res);
if (ret == TSDB_CODE_SUCCESS) {
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
} else {
// todo add error
int32_t ret = getAllTableList(pTable, res);
if (ret != TSDB_CODE_SUCCESS) {
tsdbUnlockRepoMeta(tsdb);
goto _error;
}
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret;
}
......@@ -2227,31 +2213,45 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
doQueryTableList(pTable, res, expr);
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
tsdbTrace("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret;
_error:
return terrno;
}
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
return TSDB_CODE_TDB_INVALID_TABLE_ID;
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error;
}
//todo assert table type, add the table ref count
assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE);
tsdbRefTable(pTable);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableId));
SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable->tableId);
taosArrayPush(group, &pTable);
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
_error:
return terrno;
}
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
......@@ -2263,12 +2263,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem);
tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf);
}
......@@ -2293,3 +2292,26 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tfree(pQueryHandle);
}
void tsdbDestoryTableGroup(STableGroupInfo *pGroupList) {
assert(pGroupList != NULL);
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* p = taosArrayGetP(pGroupList->pGroupList, i);
size_t numOfTables = taosArrayGetSize(p);
for(int32_t j = 0; j < numOfTables; ++j) {
STable* pTable = taosArrayGetP(p, j);
assert(pTable != NULL);
tsdbUnRefTable(pTable);
}
taosArrayDestroy(p);
}
taosArrayDestroy(pGroupList->pGroupList);
}
......@@ -68,6 +68,7 @@ while $loop <= $loops
while $i < 10
sql select count(*) from $stb where t1 = $i
if $data00 != $rowNum then
print expect $rowNum, actual: $data00
return -1
endi
$i = $i + 1
......
......@@ -172,6 +172,7 @@ while $loop <= $loops
endi
sql select c8 from $stb where t1 = $i
if $rows != $rowNum then
print expect $rowNum, actual: $rows
return -1
endi
sql select c9 from $stb where t1 = $i
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册