You need to sign in or sign up before continuing.
未验证 提交 2f7fb6e1 编写于 作者: J Jeff Tao 提交者: GitHub

Merge branch '2.0' into refact/sync

......@@ -12,7 +12,6 @@ ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(query)
ADD_SUBDIRECTORY(kit)
ADD_SUBDIRECTORY(plugins)
ADD_SUBDIRECTORY(sdb)
ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(vnode)
ADD_SUBDIRECTORY(dnode)
......
......@@ -32,7 +32,7 @@ extern "C" {
#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_NOMRAL_TABLE(metaInfo) (!(UTIL_TABLE_IS_SUPERTABLE(metaInfo)))
#define UTIL_TABLE_CREATE_FROM_STABLE(metaInfo) \
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
......@@ -106,7 +106,7 @@ bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
......@@ -176,8 +176,8 @@ void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex);
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond);
......@@ -207,7 +207,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
......
......@@ -70,7 +70,8 @@ typedef struct STableMeta {
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SSuperTableMeta *pMetricMeta; // metricmeta
// SSuperTableMeta *pMetricMeta; // metricmeta
SArray* vgroupIdList;
/*
* 1. keep the vnode index during the multi-vnode super table projection query
......@@ -294,20 +295,21 @@ typedef struct SResRec {
struct STSBuf;
typedef struct {
int32_t code;
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
int rspType;
int rspLen;
int32_t rspType;
int32_t rspLen;
uint64_t qhandle;
int64_t uid;
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int row;
int32_t row;
int16_t numOfCols;
int16_t precision;
bool completed;
int32_t code;
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
......
......@@ -406,11 +406,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL) {
tscError("%p callBack is NULL!!!", pSql);
return;
}
if (pSql->fp == (void *)1) {
pSql->fp = NULL;
......@@ -464,7 +459,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, 0);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -494,7 +489,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -519,7 +514,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else {
tscTrace("%p get tableMeta/metricMeta successfully", pSql);
tscTrace("%p get tableMeta successfully", pSql);
}
tscDoQuery(pSql);
......
......@@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem += 1;
}
} else {
/*
* when counting on the primary time stamp column and no statistics data is provided,
* simple use the size value
*/
numOfElem = pCtx->size;
}
}
......
......@@ -286,6 +286,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
// todo add order support
static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
#if 0
// the result structure has been completed in sql parse, so we
// only need to reorganize the results in the column format
SSqlCmd * pCmd = &pSql->cmd;
......@@ -337,6 +338,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
}
}
#endif
return 0;
}
......@@ -345,7 +347,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
#if 0
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo);
......@@ -369,6 +371,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
}
rowIdx++;
}
#endif
return 0;
}
......
......@@ -18,9 +18,10 @@
#define _XOPEN_SOURCE
#include "hash.h"
#include "os.h"
#include "tscSecondaryMerge.h"
#include "hash.h"
//#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
......@@ -32,6 +33,8 @@
#include "tstoken.h"
#include "ttime.h"
#include "dataformat.h"
enum {
TSDB_USE_SERVER_TS = 0,
TSDB_USE_CLI_TS = 1,
......@@ -393,7 +396,6 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
// bool isPrevOptr; //fang, never used
SSQLToken sToken = {0};
char * payload = pDataBlocks->pData + pDataBlocks->size;
......@@ -604,8 +606,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
return TSDB_CODE_SUCCESS;
}
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->sid = pTableMeta->sid;
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->sid;
pBlocks->uid = pTableMeta->uid;
pBlocks->sversion = pTableMeta->sversion;
pBlocks->numOfRows += numOfRows;
......@@ -613,10 +615,10 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableM
// data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData;
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet.
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
// if use server time, this block must be ordered
if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
......@@ -624,7 +626,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
}
if (!dataBuf->ordered) {
char *pBlockData = pBlocks->payLoad;
char *pBlockData = pBlocks->data;
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0;
......@@ -650,7 +652,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
}
}
......@@ -663,7 +665,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SShellSubmitBlock), tinfo.rowSize, pTableMetaInfo->name,
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
......@@ -691,11 +693,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
SParamInfo *param = dataBuf->params + i;
if (param->idx == -1) {
param->idx = pCmd->numOfParams++;
param->offset -= sizeof(SShellSubmitBlock);
param->offset -= sizeof(SSubmitBlk);
}
}
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
dataBuf->vgId = pTableMeta->vgId;
......@@ -1141,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
STableDataBlocks *pDataBlock = NULL;
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SShellSubmitBlock), pTableMetaInfo->name,
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
pTableMeta, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean;
......@@ -1236,22 +1238,10 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _clean;
}
// submit to more than one vnode
if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgId
if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
// set the next sent data vnode index in data block arraylist
pTableMetaInfo->vnodeIndex = 1;
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
}
......@@ -1303,12 +1293,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
int32_t ret = TSDB_CODE_SUCCESS;
if (NULL == pSql->asyncTblPos) {
tscCleanSqlCmd(&pSql->cmd);
} else {
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
}
if (tscIsInsertOrImportData(pSql->sqlstr)) {
/*
......@@ -1353,7 +1338,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
......@@ -1394,7 +1379,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock),
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk),
pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
......@@ -1435,7 +1420,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
}
pTableDataBlock = pCmd->pDataBlocks->pData[0];
pTableDataBlock->size = sizeof(SShellSubmitBlock);
pTableDataBlock->size = sizeof(SSubmitBlk);
pTableDataBlock->rowSize = tinfo.rowSize;
numOfRows += pSql->res.numOfRows;
......
......@@ -325,12 +325,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
uint32_t dataSize = totalDataSize / alloced;
assert(dataSize * alloced == totalDataSize);
if (alloced == binded) {
totalDataSize += dataSize + sizeof(SShellSubmitBlock);
totalDataSize += dataSize + sizeof(SSubmitBlk);
if (totalDataSize > pBlock->nAllocSize) {
const double factor = 1.5;
void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor));
......@@ -342,7 +342,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
}
}
char* data = pBlock->pData + sizeof(SShellSubmitBlock) + dataSize * binded;
char* data = pBlock->pData + sizeof(SSubmitBlk) + dataSize * binded;
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
SParamInfo* param = pBlock->params + j;
int code = doBindParam(data, param, bind + param->idx);
......@@ -365,10 +365,10 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size += totalDataSize / alloced;
SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData;
SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
pSubmit->numOfRows += pSubmit->numOfRows / alloced;
}
......@@ -398,10 +398,10 @@ static int insertStmtReset(STscStmt* pStmt) {
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock);
pBlock->size = sizeof(SShellSubmitBlock) + totalDataSize / alloced;
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced;
SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData;
SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
pSubmit->numOfRows = pSubmit->numOfRows / alloced;
}
}
......
......@@ -209,9 +209,14 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
assert(pQueryInfo->numOfTables == 0);
// assert(pQueryInfo->numOfTables == 0);
STableMetaInfo* pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = NULL;
if (pQueryInfo->numOfTables == 0) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
} else {
pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0];
}
pCmd->command = pInfo->type;
......@@ -639,7 +644,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
* check invalid SQL:
* select tbname, tags_fields from super_table_name interval(1s)
*/
if (tscQueryMetricTags(pQueryInfo) && pQueryInfo->intervalTime > 0) {
if (tscQueryTags(pQueryInfo) && pQueryInfo->intervalTime > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
......@@ -746,7 +751,7 @@ int32_t setMeterID(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlO
tscClearMeterMetaInfo(pTableMetaInfo, false);
}
} else {
assert(pTableMetaInfo->pTableMeta == NULL && pTableMetaInfo->pMetricMeta == NULL);
assert(pTableMetaInfo->pTableMeta == NULL);
}
tfree(oldName);
......@@ -1252,7 +1257,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (tscQueryMetricTags(pQueryInfo)) { // local handle the metric tag query
if (tscQueryTags(pQueryInfo)) { // local handle the metric tag query
pCmd->count = numOfCols; // the number of meter schema, tricky.
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
}
......@@ -1293,7 +1298,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
int16_t functionId = (int16_t)((colIdx >= numOfCols) ? TSDB_FUNC_TAGPRJ : TSDB_FUNC_PRJ);
if (functionId == TSDB_FUNC_TAGPRJ) {
addRequiredTagColumn(pQueryInfo, colIdx - numOfCols, tableIndex);
// addRequiredTagColumn(pQueryInfo, colIdx - numOfCols, tableIndex);
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
} else {
pQueryInfo->type = TSDB_QUERY_TYPE_PROJECTION_QUERY;
......@@ -1396,8 +1401,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
SColumnList ids = {0};
ids.ids[0] = *pIndex;
// tag columns do not add to source list
ids.num = (j >= tscGetNumOfColumns(pTableMeta)) ? 0 : 1;
ids.num = 1;
insertResultField(pQueryInfo, startPos + j, &ids, pSchema[j].bytes, pSchema[j].type, pSchema[j].name, pExpr);
}
......@@ -1489,11 +1493,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes);
strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName));
// for point interpolation/last_row query, we need the timestamp column to be loaded
// for all querie, the timestamp column meeds to be loaded
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
tscColumnBaseInfoInsert(pQueryInfo, &index);
}
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr);
......@@ -1582,6 +1584,9 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
}
}
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
return TSDB_CODE_SUCCESS;
}
case TK_SUM:
......@@ -1690,6 +1695,9 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
}
}
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
return TSDB_CODE_SUCCESS;
}
case TK_FIRST:
......@@ -1708,7 +1716,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
}
/* in first/last function, multiple columns can be add to resultset */
for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) {
tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]);
if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) {
......@@ -1773,6 +1780,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
}
return TSDB_CODE_SUCCESS;
}
}
......@@ -1891,6 +1899,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
default:
return TSDB_CODE_INVALID_SQL;
}
}
// todo refactor
......@@ -2779,7 +2789,7 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString) {
return TSDB_CODE_SUCCESS;
}
static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuilder* sb) {
static int32_t tablenameListToString(tSQLExpr* pExpr, SStringBuilder* sb) {
tSQLExprList* pList = pExpr->pParam;
if (pList->nExpr <= 0) {
return TSDB_CODE_INVALID_SQL;
......@@ -2805,7 +2815,7 @@ static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuild
return TSDB_CODE_SUCCESS;
}
static int32_t tablenameCondToString(tSQLExpr* pExpr, /*char* str*/ SStringBuilder* sb) {
static int32_t tablenameCondToString(tSQLExpr* pExpr, SStringBuilder* sb) {
taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN);
taosStringBuilderAppendString(sb, pExpr->val.pz);
......@@ -3501,7 +3511,7 @@ int tableNameCompar(const void* lhs, const void* rhs) {
return ret > 0 ? 1 : -1;
}
static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
int16_t tableCondIndex, SStringBuilder* sb) {
const char* msg = "table name too long";
......@@ -3730,7 +3740,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
return ret;
}
tsSetMetricQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c);
tsSetSTableQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
......@@ -3746,8 +3756,8 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
return TSDB_CODE_SUCCESS;
}
const char* msg = "invalid filter expression";
const char* msg1 = "invalid expression";
const char* msg2 = "invalid filter expression";
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -3805,11 +3815,11 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
// 7. query condition for table name
pQueryInfo->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
ret = setTableCondForMetricQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
ret = setTableCondForSTableQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
taosStringBuilderDestroy(&sb);
if (!validateFilterExpr(pQueryInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg);
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
doAddJoinTagsColumnsIntoTagList(pQueryInfo, &condExpr);
......@@ -4234,7 +4244,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo))) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) &&
UTIL_TABLE_CREATE_FROM_STABLE(pTableMetaInfo)) {
UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
......@@ -4660,14 +4670,13 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
* And then launching multiple async-queries against all qualified virtual nodes, during the first-stage
* query operation.
*/
int32_t code = tscGetMetricMeta(pSql, clauseIndex);
int32_t code = tscGetSTableVgroupInfo(pSql, clauseIndex);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// No tables included. No results generated. Query results are empty.
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pTableMetaInfo->pTableMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfTables == 0) {
if (pTableMetaInfo->pTableMeta == NULL) {
tscTrace("%p no table in metricmeta, no output result", pSql);
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
......@@ -5435,8 +5444,9 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// two table: the first one is for current table, and the secondary is for the super table.
if (pQueryInfo->numOfTables < 2) {
tscAddEmptyMetaInfo(pQueryInfo);
assert(pQueryInfo->numOfTables == 2);
}
const int32_t TABLE_INDEX = 0;
const int32_t STABLE_INDEX = 1;
......@@ -5682,6 +5692,13 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
int32_t code = tscGetSTableVgroupInfo(pSql, index);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// parse the group by clause in the first place
if (parseGroupbyClause(pQueryInfo, pQuerySql->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
......
......@@ -32,7 +32,7 @@ int32_t tscGetNumOfTags(const STableMeta* pTableMeta) {
}
if (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert(tinfo.numOfTags > 0);
assert(tinfo.numOfTags >= 0);
return tinfo.numOfTags;
}
......@@ -51,14 +51,14 @@ int32_t tscGetNumOfColumns(const STableMeta* pTableMeta) {
SSchema *tscGetTableSchema(const STableMeta *pTableMeta) {
assert(pTableMeta != NULL);
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
STableMeta* pSTableMeta = pTableMeta->pSTable;
assert (pSTableMeta != NULL);
// if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
// STableMeta* pSTableMeta = pTableMeta->pSTable;
// assert (pSTableMeta != NULL);
//
// return pSTableMeta->schema;
// }
return pSTableMeta->schema;
}
return pTableMeta->schema;
return (SSchema*) pTableMeta->schema;
}
SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
......@@ -73,10 +73,12 @@ SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
#if 0
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert (pTableMeta->pSTable != NULL);
return pTableMeta->pSTable->tableInfo;
}
#endif
return pTableMeta->tableInfo;
}
......@@ -130,11 +132,12 @@ SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t startCol)
assert(pTableMeta != NULL);
SSchema* pSchema = pTableMeta->schema;
#if 0
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert (pTableMeta->pSTable != NULL);
pSchema = pTableMeta->pSTable->schema;
}
#endif
return &pSchema[startCol];
}
......
......@@ -605,7 +605,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * 1);
if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -636,7 +636,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) {
size_t numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
for (int32_t i = 0; i < numOfSubs; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
......
此差异已折叠。
......@@ -147,7 +147,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
}
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pObj;
}
......@@ -546,11 +546,11 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, goes on
*/
if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
(!tscHasReachLimitation(pQueryInfo1, pRes1))) {
allSubqueryExhausted = false;
break;
}
// if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
// (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
// allSubqueryExhausted = false;
// break;
// }
}
hasData = !allSubqueryExhausted;
......@@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
......@@ -653,66 +649,9 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
return pRes->tsrow;
}
TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
return NULL;
}
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p data from all subqueries have been retrieved to client", pSql);
return tscBuildResFromSubqueries(pSql);
} else {
tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code);
return NULL;
}
} else if (pRes->row >= pRes->numOfRows) {
/**
* NOT a join query
*
* If the data block of current result set have been consumed already, try fetch next result
* data block from virtual node.
*/
tscResetForNextRetrieve(pRes);
if (pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql); // retrieve data from virtual node
// if failed to retrieve data from current virtual node, try next one if exists
if (hasMoreVnodesToTry(pSql)) {
tscTryQueryNextVnode(pSql, NULL);
}
/*
* local reducer has handle this case,
* so no need to add the pRes->numOfRows for super table query
*/
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
if (pRes->numOfRows == 0) {
return NULL;
}
}
return doSetResultRowData(pSql);
}
static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
if (numOfRows < 0) {
// set the error code
if (numOfRows < 0) { // set the error code
pSql->res.code = -numOfRows;
}
......@@ -729,12 +668,14 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) {
if (pRes->qhandle == 0 ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
return NULL;
}
// current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows &&
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
......
......@@ -80,7 +80,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
code = tscGetMetricMeta(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->emptyRspSem, 0, 1);
SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr);
......
......@@ -178,11 +178,11 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
int numOfTables = 0;
if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
numOfTables += pVnodeSidList->numOfSids;
}
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// numOfTables += pVnodeSidList->numOfSids;
// }
}
SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress));
......@@ -197,17 +197,17 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
progress[0].uid = uid;
progress[0].key = tscGetSubscriptionProgress(pSub, uid);
} else {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
numOfTables = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pTableMetaInfo->uid;
progress[numOfTables].uid = uid;
progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
}
}
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// numOfTables = 0;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
// int64_t uid = pTableMetaInfo->uid;
// progress[numOfTables].uid = uid;
// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
// }
// }
qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
}
......
......@@ -476,7 +476,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode
int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
if ((++pTableMetaInfo->vnodeIndex) < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal);
......@@ -541,16 +542,16 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
// if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
// pSupporter->pState->numOfCompleted = 0;
// pSupporter->pState->numOfTotal = 1;
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
}
int32_t numOfTotal = pSupporter->pState->numOfTotal;
......@@ -608,10 +609,10 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pQueryInfo, pRes))) {
numOfFetch++;
}
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
// (!tscHasReachLimitation(pQueryInfo, pRes))) {
// numOfFetch++;
// }
} else {
if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) {
numOfFetch++;
......@@ -788,7 +789,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
// assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data
......@@ -1005,11 +1006,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->qhandle = 1; // hack the qhandle check
const uint32_t nBufferSize = (1 << 16); // 64KB
const uint32_t nBufferSize = (1u << 16); // 64KB
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t numOfSubQueries = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
......@@ -1115,7 +1117,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
tfree(trsupport);
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
// set no disk space error info
......@@ -1137,7 +1139,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
pthread_mutex_unlock(&trsupport->queryMutex);
tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
tscRetrieveFromDnodeCallBack(trsupport, tres, trsupport->pState->code);
}
static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
......@@ -1232,81 +1234,18 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
}
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
return;
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
}
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
pPObj, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
return;
}
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret < 0) {
// set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
} else {
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
} else { // all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */
uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
pSvd->vnode, numOfRowsFromVnode, idx);
// data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
// tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
// pSvd->vnode, numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
......@@ -1321,7 +1260,7 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
......@@ -1329,9 +1268,8 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR
// then used as an input of loser tree for disk-based merge routine
int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
pQueryInfo->groupbyExpr.orderType);
if (ret != 0) {
/* set no disk space error info, and abort retry */
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
if (ret != 0) { // set no disk space error info, and abort retry
return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_CLI_NO_DISKSPACE);
}
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
......@@ -1373,6 +1311,81 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR
} else {
tscQueueAsyncRes(pPObj);
}
}
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
return;
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
}
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// SVnodeSidList *vnodeInfo = 0;
// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
// tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
// pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
pPObj, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
return;
}
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret < 0) { // set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
} else if (pRes->completed) {
tscAllDataRetrievedFromDnode(trsupport, pSql);
} else { // continue fetch data from dnode
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
} else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql);
}
}
......@@ -1409,10 +1422,10 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SVnodeSidList *vnodeInfo = NULL;
SVnodeDesc * pSvd = NULL;
if (pTableMetaInfo->pMetricMeta != NULL) {
vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
}
// if (pTableMetaInfo->pMetricMeta != NULL) {
// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
// }
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
......@@ -1432,7 +1445,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
/*
* if a query on a vnode is failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
* function to abort current and remain retrieve process.
*
* NOTE: threadsafe is required.
......@@ -1453,7 +1466,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
// assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
tscProcessSql(pNew);
return;
}
......@@ -1470,7 +1483,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->subqueryIndex, pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
tscRetrieveFromDnodeCallBack(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
......@@ -1481,7 +1494,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->subqueryIndex);
}
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
}
......
......@@ -53,7 +53,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
const int32_t maxKeySize = TSDB_MAX_TAGS_LEN; // allowed max key size
SCond* cond = tsGetMetricQueryCondPos(pTagCond, uid);
SCond* cond = tsGetSTableQueryCondPos(pTagCond, uid);
char join[512] = {0};
if (pTagCond->joinInfo.hasJoin) {
......@@ -92,7 +92,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
free(tmp);
}
SCond* tsGetMetricQueryCondPos(STagCond* pTagCond, uint64_t uid) {
SCond* tsGetSTableQueryCondPos(STagCond* pTagCond, uint64_t uid) {
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
if (uid == pTagCond->cond[i].uid) {
return &pTagCond->cond[i];
......@@ -102,7 +102,8 @@ SCond* tsGetMetricQueryCondPos(STagCond* pTagCond, uint64_t uid) {
return NULL;
}
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
// todo refactor by using SArray
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
size_t len = strlen(str);
if (len == 0) {
return;
......@@ -122,7 +123,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) {
(pCmd->msgType == TSDB_MSG_TYPE_QUERY);
}
bool tscQueryMetricTags(SQueryInfo* pQueryInfo) {
bool tscQueryTags(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
if (tscSqlExprGet(pQueryInfo, i)->functionId != TSDB_FUNC_TAGPRJ) {
return false;
......@@ -172,6 +173,7 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) {
}
SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx) {
#if 0
if (pMetricmeta == NULL) {
tscError("illegal metricmeta");
return 0;
......@@ -189,6 +191,8 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
}
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
#endif
}
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
......@@ -221,12 +225,12 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
// for select query super table, the metricmeta can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pMetricMeta != NULL);
// assert(pTableMetaInfo->pMetricMeta != NULL);
}
if (pTableMetaInfo->pMetricMeta == NULL) {
return false;
}
// if (pTableMetaInfo->pMetricMeta == NULL) {
// return false;
// }
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
return false;
......@@ -259,12 +263,12 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
}
// only query on tag, not a projection query
if (tscQueryMetricTags(pQueryInfo)) {
if (tscQueryTags(pQueryInfo)) {
return false;
}
// for project query, only the following two function is allowed
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
......@@ -423,9 +427,6 @@ void tscFreeResData(SSqlObj* pSql) {
}
void tscFreeSqlResult(SSqlObj* pSql) {
//TODO not free
return;
tfree(pSql->res.pRsp);
pSql->res.row = 0;
pSql->res.numOfRows = 0;
......@@ -469,8 +470,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
tscFreeSqlCmdData(pCmd);
tscTrace("%p free sqlObj partial completed", pSql);
tscFreeSqlCmdData(pCmd);
}
void tscFreeSqlObj(SSqlObj* pSql) {
......@@ -489,10 +488,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pCmd->allocSize = 0;
if (pSql->fp == NULL) {
tsem_destroy(&pSql->rspSem);
tsem_destroy(&pSql->emptyRspSem);
}
free(pSql);
}
......@@ -695,6 +691,49 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
return TSDB_CODE_SUCCESS;
}
static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
int32_t firstPartLen = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
pDataBlock += sizeof(SSubmitBlk);
int32_t total = sizeof(int32_t)*2;
for(int32_t i = 0; i < tinfo.numOfColumns; ++i) {
switch (pSchema[i].type) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
assert(0); // not support binary yet
firstPartLen += sizeof(int32_t);break;
}
default:
firstPartLen += tDataTypeDesc[pSchema[i].type].nSize;
total += tDataTypeDesc[pSchema[i].type].nSize;
}
}
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData;
int32_t rows = htons(pBlock->numOfRows);
for(int32_t i = 0; i < rows; ++i) {
*(int32_t*) pDataBlock = total;
pDataBlock += sizeof(int32_t);
*(int32_t*) pDataBlock = firstPartLen;
pDataBlock += sizeof(int32_t);
memcpy(pDataBlock, p, pTableDataBlock->rowSize);
p += pTableDataBlock->rowSize;
pDataBlock += pTableDataBlock->rowSize;
}
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
......@@ -716,7 +755,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
return ret;
}
int64_t destSize = dataBuf->size + pOneTableBlock->size;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2;
if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
......@@ -730,29 +769,33 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList);
tfree(dataBuf->pData);
tscDestroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
sortRemoveDuplicates(pOneTableBlock);
char* e = (char*)pBlocks->payLoad + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->sid,
pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->payLoad), GET_INT64_VAL(e));
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e));
pBlocks->sid = htonl(pBlocks->sid);
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
memcpy(dataBuf->pData + dataBuf->size, pOneTableBlock->pData, pOneTableBlock->size);
pBlocks->len = htonl(len);
dataBuf->size += pOneTableBlock->size;
// erase the empty space reserved for binary data
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
dataBuf->size += (len + sizeof(SSubmitBlk));
dataBuf->numOfTables += 1;
}
......@@ -1704,18 +1747,10 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
}
int32_t command = pSql->cmd.command;
if (pTscObj->pSql == pSql) {
/*
* in case of taos_connect_a query, the object should all be released, even it is the
* master sql object. Otherwise, the master sql should not be released
*/
if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) {
if (command == TSDB_SQL_CONNECT) {
return true;
}
return false;
}
if (command == TSDB_SQL_INSERT) {
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1894,7 +1929,7 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
}
pTableMetaInfo->pTableMeta = pTableMeta;
pTableMetaInfo->pMetricMeta = pMetricMeta;
// pTableMetaInfo->pMetricMeta = pMetricMeta;
pTableMetaInfo->numOfTags = numOfTags;
if (tags != NULL) {
......@@ -1944,7 +1979,7 @@ void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
}
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache);
// taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache);
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
......@@ -2083,15 +2118,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
// pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
// pTableMetaInfo->tagColumnIndex);
}
assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL);
// assert(pFinalInfo->pMetricMeta != NULL);
}
tscTrace(
......@@ -2191,13 +2226,13 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
return false;
}
// }
int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1);
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1);
}
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
......@@ -2213,7 +2248,8 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
while (++pTableMetaInfo->vnodeIndex < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
......
#include <gtest/gtest.h>
#include <cassert>
#include <iostream>
#include "taos.h"
#include "tstoken.h"
#include "ttime.h"
#include "tutil.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
extern void deltaToUtcInitOnce();
/* test parse time function */
TEST(testCase, parse_time) {
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
deltaToUtcInitOnce();
char t1[] = "2018-1-1 1:1:1.952798";
char t13[] = "1970-1-1 0:0:0";
int64_t time = 0, time1 = 0;
taosParseTime(t1, &time, strlen(t1), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 1514739661952);
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, timezone * MILLISECOND_PER_SECOND);
char t2[] = "2018-1-1T1:1:1.952Z";
taosParseTime(t2, &time, strlen(t2), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 1514739661952 + 28800000);
char t3[] = "2018-1-1 1:01:01.952";
taosParseTime(t3, &time, strlen(t3), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 1514739661952);
char t4[] = "2018-1-1 1:01:01.9";
char t5[] = "2018-1-1 1:01:1.900";
char t6[] = "2018-01-01 1:1:1.90";
char t7[] = "2018-01-01 01:01:01.9";
char t8[] = "2018-01-01 01:01:01.9007865";
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t5, &time1, strlen(t5), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t6, &time1, strlen(t6), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
taosParseTime(t4, &time, strlen(t4), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t7, &time1, strlen(t7), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
taosParseTime(t5, &time, strlen(t5), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t8, &time1, strlen(t8), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
char t9[] = "2017-4-3 1:1:2.980";
char t10[] = "2017-4-3T2:1:2.98+9:00";
taosParseTime(t9, &time, strlen(t9), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t10, &time1, strlen(t10), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
char t11[] = "2017-4-3T2:1:2.98+09:00";
taosParseTime(t11, &time, strlen(t11), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t10, &time1, strlen(t10), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
char t12[] = "2017-4-3T2:1:2.98+0900";
taosParseTime(t11, &time, strlen(t11), TSDB_TIME_PRECISION_MILLI);
taosParseTime(t12, &time1, strlen(t12), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, time1);
taos_options(TSDB_OPTION_TIMEZONE, "UTC");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 0);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Shanghai");
deltaToUtcInitOnce();
char t14[] = "1970-1-1T0:0:0Z";
taosParseTime(t14, &time, strlen(t14), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 0);
char t40[] = "1970-1-1 0:0:0.999999999";
taosParseTime(t40, &time, strlen(t40), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 999 + timezone * MILLISECOND_PER_SECOND);
char t41[] = "1997-1-1 0:0:0.999999999";
taosParseTime(t41, &time, strlen(t41), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 852048000999);
int64_t k = timezone;
char t42[] = "1997-1-1T0:0:0.999999999Z";
taosParseTime(t42, &time, strlen(t42), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 852048000999 - timezone * MILLISECOND_PER_SECOND);
////////////////////////////////////////////////////////////////////
// illegal timestamp format
char t15[] = "2017-12-33 0:0:0";
EXPECT_EQ(taosParseTime(t15, &time, strlen(t15), TSDB_TIME_PRECISION_MILLI), -1);
char t16[] = "2017-12-31 99:0:0";
EXPECT_EQ(taosParseTime(t16, &time, strlen(t16), TSDB_TIME_PRECISION_MILLI), -1);
char t17[] = "2017-12-31T9:0:0";
EXPECT_EQ(taosParseTime(t17, &time, strlen(t17), TSDB_TIME_PRECISION_MILLI), -1);
char t18[] = "2017-12-31T9:0:0.Z";
EXPECT_EQ(taosParseTime(t18, &time, strlen(t18), TSDB_TIME_PRECISION_MILLI), -1);
char t19[] = "2017-12-31 9:0:0.-1";
EXPECT_EQ(taosParseTime(t19, &time, strlen(t19), TSDB_TIME_PRECISION_MILLI), -1);
char t20[] = "2017-12-31 9:0:0.1+12:99";
EXPECT_EQ(taosParseTime(t20, &time, strlen(t20), TSDB_TIME_PRECISION_MILLI), 0);
EXPECT_EQ(time, 1514682000100);
char t21[] = "2017-12-31T9:0:0.1+12:99";
EXPECT_EQ(taosParseTime(t21, &time, strlen(t21), TSDB_TIME_PRECISION_MILLI), -1);
char t22[] = "2017-12-31 9:0:0.1+13:1";
EXPECT_EQ(taosParseTime(t22, &time, strlen(t22), TSDB_TIME_PRECISION_MILLI), 0);
char t23[] = "2017-12-31T9:0:0.1+13:1";
EXPECT_EQ(taosParseTime(t23, &time, strlen(t23), TSDB_TIME_PRECISION_MILLI), 0);
//======================== add some case ============================//
char b1[] = "9999-12-31 23:59:59.999";
taosParseTime(b1, &time, strlen(b1), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 253402271999999);
char b2[] = "2020-01-01 01:01:01.321";
taosParseTime(b2, &time, strlen(b2), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 1577811661321);
taos_options(TSDB_OPTION_TIMEZONE, "America/New_York");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, 18000 * MILLISECOND_PER_SECOND);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Tokyo");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, -32400 * MILLISECOND_PER_SECOND);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Shanghai");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(time, -28800 * MILLISECOND_PER_SECOND);
}
......@@ -81,11 +81,13 @@ STSchema *tdDecodeSchema(void **psrc);
*/
typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
#define dataRowLen(r) (*(int32_t *)(r))
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
#define dataRowIdx(r, i) ((char *)(r) + i)
......@@ -103,11 +105,36 @@ SDataRow tdDataRowDup(SDataRow row);
// ----------------- Data column structure
typedef struct SDataCol {
int64_t len;
char data[];
int8_t type;
int16_t colId;
int bytes;
int len;
int offset;
void * pData;
} SDataCol;
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter);
typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int numOfPoints;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
void * buf;
SDataCol cols[];
} SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1)
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols);
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
#ifdef __cplusplus
}
......
......@@ -150,7 +150,7 @@ void tdFreeSchema(STSchema *pSchema) {
*/
void tdUpdateSchema(STSchema *pSchema) {
STColumn *pCol = NULL;
int32_t offset = 0;
int32_t offset = TD_DATA_ROW_HEAD_SIZE;
for (int i = 0; i < schemaNCols(pSchema); i++) {
pCol = schemaColAt(pSchema, i);
colSetOffset(pCol, offset);
......@@ -294,14 +294,79 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow;
}
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) {
int row = *iter;
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * maxCols);
if (pCols == NULL) return NULL;
pCols->maxRowSize = maxRowSize;
pCols->maxCols = maxCols;
pCols->maxPoints = maxRows;
pCols->buf = malloc(maxRowSize * maxRows);
if (pCols->buf == NULL) {
free(pCols);
return NULL;
}
return pCols;
}
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
// assert(schemaNCols(pSchema) <= pCols->numOfCols);
tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema);
pCols->cols[0].pData = pCols->buf;
for (int i = 0; i < schemaNCols(pSchema); i++) {
// TODO
if (i > 0) {
pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints;
}
pCols->cols[i].type = colType(schemaColAt(pSchema, i));
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i));
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
}
*iter = row + 1;
return pCols;
}
void tdFreeDataCols(SDataCols *pCols) {
if (pCols) {
if (pCols->buf) free(pCols->buf);
free(pCols);
}
}
void tdResetDataCols(SDataCols *pCols) {
pCols->numOfPoints = 0;
for (int i = 0; i < pCols->maxCols; i++) {
pCols->cols[i].len = 0;
}
}
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
TSKEY key = dataRowKey(row);
for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i;
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes);
pCol->len += pCol->bytes;
}
pCols->numOfPoints++;
}
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int pointsLeft = pCols->numOfPoints - pointsToPop;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *p_col = pCols->cols + iCol;
if (p_col->len > 0) {
p_col->len = TYPE_BYTES[p_col->type] * pointsLeft;
if (pointsLeft > 0) {
memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len);
}
}
}
pCols->numOfPoints = pointsLeft;
}
/**
......
......@@ -13,7 +13,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http tsdb)
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb)
#IF (TD_CLUSTER)
# TARGET_LINK_LIBRARIES(taosd dcluster)
......
......@@ -225,17 +225,17 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj);
vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj);
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
dTrace("open vnode:%d in %s", vnodeObj.vgId, rootDir);
dTrace("open vnode:%d in %s", pVnode->vgId, rootDir);
return TSDB_CODE_SUCCESS;
}
......@@ -314,17 +314,17 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker(&vnodeObj);
vnodeObj.rworker = dnodeAllocateReadWorker(&vnodeObj);
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
dPrint("vgroup:%d, vnode:%d is created", vnodeObj.vgId, vnodeObj.vgId);
dPrint("vgroup:%d, vnode:%d is created", pVnode->vgId, pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
......
......@@ -77,6 +77,8 @@ void dnodeRead(SRpcMsg *pMsg) {
char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL;
dTrace("dnode read msg disposal");
// SMsgDesc *pDesc = pCont;
// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
// pCont += sizeof(SMsgDesc);
......@@ -90,8 +92,8 @@ void dnodeRead(SRpcMsg *pMsg) {
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1; //htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId);
if (pVnode == NULL) {
......@@ -162,7 +164,7 @@ static void *dnodeProcessReadQueue(void *param) {
void *pVnode;
while (1) {
if (taosReadQitemFromQset(qset, &type, &pReadMsg, &pVnode) == 0) {
if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) {
dnodeHandleIdleReadWorker();
continue;
}
......@@ -174,7 +176,7 @@ static void *dnodeProcessReadQueue(void *param) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
dnodeProcessReadResult(pVnode, pReadMsg);
// dnodeProcessReadResult(pVnode, pReadMsg);
taosFreeQitem(pReadMsg);
dnodeReleaseVnode(pVnode);
......@@ -218,19 +220,38 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
code = terrno;
}
//TODO: query handle is returned by dnodeProcessQueryMsg
if (0) {
SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
}
static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
if (pMsg->contLen != 0) {
void* tsdb = dnodeGetVnodeTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
......@@ -244,62 +265,47 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
.msgType = 0
};
// do execute query
qTableQuery(pQInfo);
rpcSendResponse(&rpcRsp);
} else {
pQInfo = pMsg->pCont;
}
qTableQuery(pQInfo); // do execute query
}
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = htobe64(pRetrieve->qhandle);
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
int32_t rowSize = 0;
int32_t numOfRows = 0;
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
int32_t contLen = 0;
SRpcMsg rpcRsp = {0};
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
//todo free qinfo
pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
} else {
contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
// todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
*(int64_t*) pRsp->data = 1000;
if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg);
}
}
rpcRsp = (SRpcMsg) {
SRpcMsg rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
}
rpcSendResponse(&rpcRsp);
}
......@@ -53,12 +53,12 @@ typedef struct _thread_obj {
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
static void dnodeProcessWriteResult(SWriteMsg *pWrite);
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg);
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg);
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg);
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg);
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg);
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite);
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg);
SWriteWorkerPool wWorkerPool;
......@@ -193,9 +193,9 @@ static void *dnodeProcessWriteQueue(void *param) {
continue;
}
for (int32_t i=0; i<numOfMsgs; ++i) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
// retrieve all items, and write them into WAL
taosGetQitem(qall, &type, &pWriteMsg);
taosGetQitem(qall, &type, (void **)&pWriteMsg);
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
}
......@@ -206,7 +206,7 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems(qall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, &type, &pWriteMsg);
taosGetQitem(qall, &type, (void **)&pWriteMsg);
terrno = 0;
if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) {
......@@ -218,7 +218,6 @@ static void *dnodeProcessWriteQueue(void *param) {
dnodeProcessWriteResult(pVnode, pWriteMsg);
taosFreeQitem(pWriteMsg);
}
}
taosFreeQall(qall);
......@@ -270,7 +269,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
}
}
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
dTrace("submit msg is disposed");
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
......@@ -279,7 +278,10 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
// todo write to tsdb
void* tsdb = dnodeGetVnodeTsdb(pVnode);
assert(tsdb != NULL);
tsdbInsertData(tsdb, pMsg->pCont);
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
......@@ -288,10 +290,11 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
......@@ -340,16 +343,16 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
dnodeReleaseVnode(pMsg->pVnode);
dnodeReleaseVnode(pVnode);
dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
......@@ -359,16 +362,16 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
.tid = htonl(pTable->sid)
};
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
dnodeReleaseVnode(pMsg->pVnode);
dnodeReleaseVnode(pVnode);
dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
......@@ -417,16 +420,16 @@ static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
dnodeReleaseVnode(pMsg->pVnode);
dnodeReleaseVnode(pVnode);
dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
......@@ -438,7 +441,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
rpcRsp.code = TSDB_CODE_SUCCESS;
dnodeReleaseVnode(pMsg->pVnode);
dnodeReleaseVnode(pVnode);
dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
......
......@@ -25,7 +25,6 @@ extern "C" {
#include "taosdef.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "sdb.h"
#include "tglobalcfg.h"
#include "thash.h"
#include "tidpool.h"
......@@ -54,8 +53,8 @@ typedef struct {
int8_t numOfMnodes;
int32_t numOfDnodes;
char mnodeName[TSDB_DNODE_NAME_LEN + 1];
char reserved[7];
char updateEnd[1];
int8_t reserved[15];
int8_t updateEnd[1];
int syncFd;
void *hbTimer;
void *pSync;
......@@ -79,8 +78,8 @@ typedef struct {
float lbScore; // calc in balance function
int32_t customScore; // config by user
char dnodeName[TSDB_DNODE_NAME_LEN + 1];
char reserved[7];
char updateEnd[1];
int8_t reserved[15];
int8_t updateEnd[1];
SVnodeLoad vload[TSDB_MAX_VNODES];
int32_t status;
uint32_t lastReboot; // time stamp for last reboot
......@@ -102,63 +101,43 @@ typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
} STableInfo;
struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
STableInfo info;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[5];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t numOfTables;
int16_t nextColId;
SSchema *schema;
SSchema * schema;
} SSuperTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
STableInfo info;
uint64_t uid;
int64_t createdTime;
int32_t sversion; //used by normal table
int32_t numOfColumns; //used by normal table
int32_t sid;
int32_t vgId;
int64_t createdTime;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int32_t sqlLen;
int8_t reserved[1];
int8_t updateEnd[1];
int16_t nextColId; //used by normal table
char* sql; //used by normal table
SSchema* schema; //used by normal table
SSuperTableObj *superTable;
} SChildTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t sqlLen;
int8_t reserved[3];
int8_t updateEnd[1];
char* sql; //null-terminated string
int16_t nextColId;
SSchema* schema;
} SNormalTableObj;
struct _db_obj;
typedef struct _vg_obj {
uint32_t vgId;
......@@ -166,15 +145,16 @@ typedef struct _vg_obj {
int64_t createdTime;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
int32_t numOfVnodes;
int32_t numOfTables;
int32_t lbIp;
int32_t lbTime;
int8_t lbStatus;
int8_t reserved[14];
int8_t updateEnd[1];
struct _vg_obj *prev, *next;
struct _db_obj *pDb;
int32_t numOfTables;
void * idPool;
STableInfo ** tableList;
SChildTableObj ** tableList;
} SVgObj;
typedef struct _db_obj {
......@@ -182,8 +162,8 @@ typedef struct _db_obj {
int8_t dirty;
int64_t createdTime;
SDbCfg cfg;
char reserved[15];
char updateEnd[1];
int8_t reserved[15];
int8_t updateEnd[1];
struct _db_obj *prev, *next;
int32_t numOfVgroups;
int32_t numOfTables;
......@@ -201,7 +181,7 @@ typedef struct _user_obj {
int64_t createdTime;
int8_t superAuth;
int8_t writeAuth;
int8_t reserved[16];
int8_t reserved[13];
int8_t updateEnd[1];
struct _user_obj *prev, *next;
struct _acctObj * pAcct;
......@@ -258,9 +238,10 @@ typedef struct {
typedef struct {
uint8_t msgType;
int8_t expected;
int8_t usePublicIp;
int8_t received;
int8_t successed;
int8_t expected;
int32_t contLen;
int32_t code;
void *ahandle;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SDB_H
#define TDENGINE_SDB_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
#include "taosdef.h"
extern uint16_t tsMgmtMgmtPort;
extern uint16_t tsMgmtSyncPort;
extern int tsMgmtPeerHBTimer; // seconds
extern char * sdbStatusStr[];
extern char * sdbRoleStr[];
extern int sdbMaster;
extern SRpcIpSet *pSdbIpList;
extern SRpcIpSet *pSdbPublicIpList;
extern void (*sdbWorkAsMasterCallback)(); // this function pointer will be set by taosd
enum _keytype {
SDB_KEYTYPE_STRING, SDB_KEYTYPE_UINT32, SDB_KEYTYPE_AUTO, SDB_KEYTYPE_RECYCLE, SDB_KEYTYPE_MAX
};
#define SDB_ROLE_UNAPPROVED 0
#define SDB_ROLE_UNDECIDED 1
#define SDB_ROLE_MASTER 2
#define SDB_ROLE_SLAVE 3
#define SDB_STATUS_OFFLINE 0
#define SDB_STATUS_UNSYNCED 1
#define SDB_STATUS_SYNCING 2
#define SDB_STATUS_SERVING 3
#define SDB_STATUS_DELETED 4
enum _sdbaction {
SDB_TYPE_INSERT,
SDB_TYPE_DELETE,
SDB_TYPE_UPDATE,
SDB_TYPE_DECODE,
SDB_TYPE_ENCODE,
SDB_TYPE_BEFORE_BATCH_UPDATE,
SDB_TYPE_BATCH_UPDATE,
SDB_TYPE_AFTER_BATCH_UPDATE,
SDB_TYPE_RESET,
SDB_TYPE_DESTROY,
SDB_MAX_ACTION_TYPES
};
#define SDB_MAX_PEERS 4
typedef struct {
uint32_t ip;
uint32_t publicIp;
char ipstr[20];
char zone[12];
char role;
int64_t createdTime;
uint64_t dbVersion;
int64_t lostTime;
char status;
char numOfMnodes;
int numOfDnodes;
char updateEnd[1];
// internal
int syncFd;
void *hbTimer;
void *pSync;
} SSdbPeer;
extern SSdbPeer *sdbPeer[];
#define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status)
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory,
void *(*appTool)(char, void *, char *, int, int *));
void *sdbGetRow(void *handle, void *key);
int64_t sdbInsertRow(void *handle, void *row, int rowSize);
int sdbDeleteRow(void *handle, void *key);
int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated);
void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
int sdbBatchUpdateRow(void *handle, void *row, int rowSize);
int64_t sdbGetId(void *handle);
int64_t sdbGetNumOfRows(void *handle);
void sdbSaveSnapShot(void *handle);
void sdbCloseTable(void *handle);
int sdbRemovePeerByIp(uint32_t ip);
int sdbInitPeers(char *directory);
void sdbCleanUpPeers();
int64_t sdbGetVersion();
int32_t sdbGetRunStatus();
#define TSDB_MAX_NORMAL_TABLES 10000
#define TSDB_MAX_SUPER_TABLES 1000
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SDB_H
......@@ -176,7 +176,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_COLUMNS 256
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
#define TSDB_DNODE_NAME_LEN 63
#define TSDB_DNODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 192
#define TSDB_DB_NAME_LEN 32
#define TSDB_COL_NAME_LEN 64
......@@ -309,6 +309,16 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SESSIONS_PER_VNODE (300)
#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
#define TSDB_MAX_MNODES 5
#define TSDB_MAX_DNODES 10
#define TSDB_MAX_ACCOUNTS 10
#define TSDB_MAX_USERS 20
#define TSDB_MAX_DBS 100
#define TSDB_MAX_VGROUPS 1000
#define TSDB_MAX_SUPER_TABLES 100
#define TSDB_MAX_NORMAL_TABLES 1000
#define TSDB_MAX_CHILD_TABLES 100000
enum {
TSDB_PRECISION_MILLI,
TSDB_PRECISION_MICRO,
......
......@@ -20,8 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taoserror.h"
......@@ -94,8 +94,8 @@ extern "C" {
#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62
#define TSDB_MSG_TYPE_CM_TABLE_META 63
#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64
#define TSDB_MSG_TYPE_CM_STABLE_META 65
#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66
#define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65
#define TSDB_MSG_TYPE_CM_STABLE_VGROUP_RSP 66
#define TSDB_MSG_TYPE_CM_TABLES_META 67
#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68
#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69
......@@ -188,15 +188,6 @@ extern char *taosMsg[];
#pragma pack(push, 1)
typedef struct {
int32_t vnode;
int32_t sid;
int32_t sversion;
uint64_t uid;
int16_t numOfRows;
char payLoad[];
} SShellSubmitBlock;
typedef struct {
int32_t numOfVnodes;
} SMsgDesc;
......@@ -206,13 +197,25 @@ typedef struct SMsgHead {
int32_t vgId;
} SMsgHead;
typedef struct {
SMsgDesc desc;
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head
int16_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
// Submit message for this TSDB
typedef struct SSubmitMsg {
SMsgHead header;
int16_t import;
int32_t numOfTables; // total number of sid
char blks[]; // number of data blocks, each table has at least one data block
} SShellSubmitMsg;
int32_t length;
int32_t compressed : 2;
int32_t numOfBlocks : 30;
SSubmitBlk blocks[];
} SSubmitMsg;
typedef struct {
int32_t index; // index of failed block in submit blocks
......@@ -238,7 +241,7 @@ typedef struct SSchema {
} SSchema;
typedef struct {
int32_t vnode; //the index of vnode
int32_t vnode; // the index of vnode
uint32_t ip;
} SVnodeDesc;
......@@ -452,47 +455,31 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct {
int32_t contLen; // msg header
int16_t vgId;
int32_t numOfTables;
uint64_t uid;
SMsgHead head;
STimeWindow window;
int32_t numOfTables;
int16_t order;
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second
int64_t intervalOffset; // start offset for interval query
int64_t slidingTime; // value for sliding window
// tag schema, used to parse tag information in pSidExtInfo
uint64_t pTagSchema;
int16_t numOfTagsCols; // required number of tags
int16_t tagLength; // tag length in current query
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
uint16_t tagCondLen; // tag length in current query
uint16_t nameCondLen; // table name in/like query expression string length
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
uint64_t groupbyTagIds;
int64_t limit;
int64_t offset;
int16_t queryType; // denote another query process
uint16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
int32_t colNameLen;
int64_t colNameList;
int64_t pSqlFuncExprs;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
......@@ -506,12 +493,14 @@ typedef struct {
} SQueryTableRsp;
typedef struct {
SMsgHead header;
uint64_t qhandle;
uint16_t free;
} SRetrieveTableMsg;
typedef struct {
typedef struct SRetrieveTableRsp {
int32_t numOfRows;
int8_t completed; // all results are returned to client
int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds;
......@@ -625,14 +614,14 @@ typedef struct {
char tableIds[];
} SCMMultiTableInfoMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
} SCMSuperTableInfoMsg;
typedef struct SCMSTableVgroupMsg {
char tableId[TSDB_TABLE_ID_LEN];
} SCMSTableVgroupMsg;
typedef struct {
int32_t numOfDnodes;
uint32_t dnodeIps[];
} SCMSuperTableInfoRsp;
} SCMSTableVgroupRspMsg;
typedef struct {
int16_t elemLen;
......@@ -678,14 +667,15 @@ typedef struct {
} SSuperTableMeta;
typedef struct STableMetaMsg {
char tableId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front
int32_t contLen;
char tableId[TSDB_TABLE_ID_LEN]; // table id
char stableId[TSDB_TABLE_ID_LEN]; // stable name if it is created according to super table
uint8_t numOfTags;
uint8_t precision;
uint8_t tableType;
int16_t numOfColumns;
int16_t sversion;
int8_t numOfVpeers;
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid;
......@@ -733,8 +723,8 @@ typedef struct {
} SDMConfigTableMsg;
typedef struct {
uint32_t dnode;
int32_t vnode;
uint32_t dnodeId;
int32_t vgId;
} SDMConfigVnodeMsg;
typedef struct {
......
......@@ -2,5 +2,5 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY(taosdump)
#ADD_SUBDIRECTORY(taosdemo)
#ADD_SUBDIRECTORY(taosdump)
......@@ -13,7 +13,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(mnode ${SRC})
TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
TARGET_LINK_LIBRARIES(mnode trpc tutil pthread)
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(mnode)
......
......@@ -23,22 +23,18 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include "taosdef.h"
#include "mnode.h"
int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable);
int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void mgmtCreateChildTable(SQueuedMsg *pMsg);
void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
#ifdef __cplusplus
}
......
......@@ -22,22 +22,23 @@ extern "C" {
#include "mnode.h"
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
// api
int32_t mgmtInitDbs();
void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
// util func
void mgmtAddSuperTableIntoDb(SDbObj *pDb);
void mgmtRemoveSuperTableFromDb(SDbObj *pDb);
void mgmtAddTableIntoDb(SDbObj *pDb);
void mgmtRemoveTableFromDb(SDbObj *pDb);
void mgmtAddVgroupIntoDb(SVgObj *pVgroup);
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup);
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup);
void mgmtMoveVgroupToTail(SVgObj *pVgroup);
void mgmtMoveVgroupToHead(SVgObj *pVgroup);
#ifdef __cplusplus
}
......
......@@ -20,9 +20,15 @@
extern "C" {
#endif
bool mgmtCheckRedirect(void *handle);
int32_t mgmtInitMnodes();
void mgmtCleanupMnodes();
bool mgmtInServerStatus();
bool mgmtIsMaster();
void mgmtGetMnodeIpList(SRpcIpSet *ipSet);
bool mgmtCheckRedirect(void *handle);
void mgmtGetMnodePrivateIpList(SRpcIpSet *ipSet);
void mgmtGetMnodePublicIpList(SRpcIpSet *ipSet);
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
int32_t mgmtRemoveMnode(uint32_t privateIp);
......
......@@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle);
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -13,31 +13,57 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_NORMAL_TABLE_H
#define TBASE_MNODE_NORMAL_TABLE_H
#ifndef TDENGINE_MNODE_SDB_H
#define TDENGINE_MNODE_SDB_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
typedef enum {
SDB_KEY_TYPE_STRING,
SDB_KEY_TYPE_AUTO
} ESdbKeyType;
int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
typedef enum {
SDB_OPER_TYPE_GLOBAL,
SDB_OPER_TYPE_LOCAL
} ESdbOperType;
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
typedef struct {
ESdbOperType type;
void * table;
void * pObj;
int64_t version;
int32_t maxRowSize;
int32_t rowSize;
void * rowData;
} SSdbOperDesc;
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
typedef struct {
char *tableName;
int32_t hashSessions;
int32_t maxRowSize;
ESdbKeyType keyType;
int32_t (*insertFp)(SSdbOperDesc *pOper);
int32_t (*deleteFp)(SSdbOperDesc *pOper);
int32_t (*updateFp)(SSdbOperDesc *pOper);
int32_t (*encodeFp)(SSdbOperDesc *pOper);
int32_t (*decodeFp)(SSdbOperDesc *pDesc);
int32_t (*destroyFp)(SSdbOperDesc *pDesc);
} SSdbTableDesc;
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void * sdbOpenTable(SSdbTableDesc *desc);
void sdbCloseTable(void *handle);
void mgmtDropAllNormalTables(SDbObj *pDropDb);
int32_t sdbInsertRow(SSdbOperDesc *pOper);
int32_t sdbDeleteRow(SSdbOperDesc *pOper);
int32_t sdbUpdateRow(SSdbOperDesc *pOper);
void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
int64_t sdbGetNumOfRows(void *handle);
uint64_t sdbGetVersion();
#ifdef __cplusplus
}
......
......@@ -22,30 +22,19 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include "taosdef.h"
#include "mnode.h"
int32_t mgmtInitSuperTables();
void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId);
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName);
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable);
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
void mgmtCreateSuperTable(SQueuedMsg *pMsg);
void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
#ifdef __cplusplus
}
......
......@@ -27,13 +27,8 @@ extern "C" {
int32_t mgmtInitTables();
void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char *tableId);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
STableInfo* mgmtGetTable(char* tableId);
void mgmtExtractTableName(char* tableId, char* tableName);
#ifdef __cplusplus
}
......
......@@ -24,7 +24,7 @@ extern "C" {
int32_t mgmtInitUsers();
void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name);
SUserObj *mgmtGetUserFromConn(void *pConn);
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
#ifdef __cplusplus
}
......
......@@ -27,18 +27,15 @@ extern "C" {
int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
void mgmtDropAllVgroups(SDbObj *pDropDb);
void mgmtCreateVgroup(SQueuedMsg *pMsg);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtUpdateVgroupIp(SDnodeObj *pDnode);
void mgmtSetVgroupIdPool();
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle);
......
......@@ -85,7 +85,9 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
pUser->prev->next = pUser->next;
}
if (pUser->next) pUser->next->prev = pUser->prev;
if (pUser->next) {
pUser->next->prev = pUser->prev;
}
if (pUser->prev == NULL) {
pAcct->pUser = pUser->next;
......
此差异已折叠。
......@@ -42,7 +42,7 @@ int32_t mgmtInitDClient() {
rpcInit.label = "MND-DC";
rpcInit.numOfThreads = 1;
rpcInit.cfp = mgmtProcessRspFromDnode;
rpcInit.sessions = tsMaxDnodes * 5;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "mgmtDClient";
......
......@@ -45,7 +45,7 @@ int32_t mgmtInitDServer() {
rpcInit.label = "MND-DS";
rpcInit.numOfThreads = 1;
rpcInit.cfp = mgmtProcessMsgFromDnode;
rpcInit.sessions = tsMaxDnodes * 5;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = mgmtDServerRetrieveAuth;
......
此差异已折叠。
......@@ -76,7 +76,7 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -169,7 +169,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -256,7 +256,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......@@ -547,9 +547,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return ;
}
uint32_t lastPrivateIp = pDnode->privateIp;
uint32_t lastPublicIp = pDnode->publicIp;
pDnode->privateIp = htonl(pStatus->privateIp);
pDnode->publicIp = htonl(pStatus->publicIp);
pDnode->lastReboot = htonl(pStatus->lastReboot);
......@@ -566,11 +563,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtSetDnodeMaxVnodes(pDnode);
}
if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) {
mgmtUpdateVgroupIp(pDnode);
//mgmtUpdateMnodeIp();
}
int32_t openVnodes = htons(pStatus->openVnodes);
for (int32_t j = 0; j < openVnodes; ++j) {
pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId);
......@@ -599,7 +591,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return;
}
mgmtGetMnodeIpList(&pRsp->ipList);
mgmtGetMnodePrivateIpList(&pRsp->ipList);
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -13,3 +13,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_LIBRARY(query ${SRC})
TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
ENDIF ()
ADD_SUBDIRECTORY(tests)
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册