提交 adb62142 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

......@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql);
void tscRemoveFromSqlList(SSqlObj *pSql);
void tscAddIntoStreamList(SSqlStream *pStream);
void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj);
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj);
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj);
void tscKillQuery(STscObj *pObj, uint32_t killId);
void tscKillStream(STscObj *pObj, uint32_t killId);
void tscKillConnection(STscObj *pObj);
......
......@@ -63,19 +63,21 @@ typedef struct SLocalReducer {
// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released
int32_t rowSize; // size of each intermediate result.
int32_t finalRowSize; // final result row size
int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released
bool hasUnprocessedRow;
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo
tFilePage * discardData;
SResultInfo * pResInfo;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
} SLocalReducer;
typedef struct SSubqueryState {
......
......@@ -285,8 +285,6 @@ typedef struct {
typedef struct STscObj {
void * signature;
void * pTimer;
char mnodeIp[TSDB_USER_LEN];
uint16_t mnodePort;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char acctId[TSDB_DB_NAME_LEN];
......@@ -294,6 +292,7 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
uint32_t connId;
struct SSqlObj * pHb;
struct SSqlObj * sqlList;
struct SSqlStream *streamList;
......
......@@ -341,16 +341,6 @@ bool stableQueryFunctChanged(int32_t funcId) {
*/
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
}
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable) {
assert(pResInfo->interResultBuf == NULL);
......@@ -387,9 +377,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
*/
static void function_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
tscTrace("no result generated, result is set to NULL");
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else {
......
......@@ -48,7 +48,7 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
int32_t radix = 10;
int32_t radixList[3] = {16, 8, 2};
int32_t radixList[3] = {16, 8, 2}; // the integer number with different radix: hex, oct, bin
if (pToken->type == TK_HEX || pToken->type == TK_OCT || pToken->type == TK_BIN) {
radix = radixList[pToken->type - TK_HEX];
}
......
......@@ -494,7 +494,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql;
pSql->pTscObj = pObj;
//pSql->pTscObj->pSql = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pStmt->pSql = pSql;
......@@ -515,7 +514,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*)pSql;
pSql->param = (void*) pSql;
pSql->fp = waitForQueryRsp;
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
......
......@@ -19,6 +19,7 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "taosmsg.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
void *tscSlowQueryConn = NULL;
......@@ -96,7 +97,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
}
tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr);
int32_t sqlSize = TSDB_SHOW_SQL_LEN + size;
int32_t sqlSize = TSDB_SLOW_QUERY_SQL_LEN + size;
char *sql = malloc(sqlSize);
if (sql == NULL) {
......@@ -106,9 +107,9 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
int len = snprintf(sql, size, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
pSql->pTscObj->user, pSql->stime, pSql->res.useconds);
int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SHOW_SQL_LEN - 1) {
sqlLen = len + TSDB_SHOW_SQL_LEN - 1;
int sqlLen = snprintf(sql + len, TSDB_SLOW_QUERY_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SLOW_QUERY_SQL_LEN - 1) {
sqlLen = len + TSDB_SLOW_QUERY_SQL_LEN - 1;
} else {
sqlLen += len;
}
......@@ -205,28 +206,28 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
pthread_mutex_unlock(&pObj->mutex);
if (pStream) {
tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId);
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
} else {
tscError("failed to kill stream, streamId:%d not exist", killId);
}
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
}
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SCMHeartBeatMsg *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
SQqueryList *pQList = (SQqueryList *)pMsg;
pQList->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList));
pHeartbeat->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
// We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
/*
......@@ -240,47 +241,46 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pQdesc->stime = pSql->stime;
pQdesc->queryId = pSql->queryId;
pQdesc->useconds = pSql->res.useconds;
pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = htobe64(pSql->res.useconds);
pQList->numOfQueries++;
pHeartbeat->numOfQueries++;
pQdesc++;
pSql = pSql->next;
pMsg += sizeof(SQueryDesc);
if (pMsg > pMax) break;
if (pHeartbeat->numOfQueries >= allocedQueriesNum) break;
}
SStreamList *pSList = (SStreamList *)pMsg;
pSList->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList));
pHeartbeat->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc *)pQdesc;
pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList;
while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pSdesc->streamId = pStream->streamId;
pSdesc->num = pStream->num;
pSdesc->streamId = htonl(pStream->streamId);
pSdesc->num = htobe64(pStream->num);
pSdesc->useconds = pStream->useconds;
pSdesc->stime = pStream->stime - pStream->interval;
pSdesc->ctime = pStream->ctime;
pSdesc->useconds = htobe64(pStream->useconds);
pSdesc->stime = htobe64(pStream->stime - pStream->interval);
pSdesc->ctime = htobe64(pStream->ctime);
pSdesc->slidingTime = pStream->slidingTime;
pSdesc->interval = pStream->interval;
pSdesc->slidingTime = htobe64(pStream->slidingTime);
pSdesc->interval = htobe64(pStream->interval);
pSList->numOfStreams++;
pHeartbeat->numOfStreams++;
pSdesc++;
pStream = pStream->next;
pMsg += sizeof(SStreamDesc);
if (pMsg > pMax) break;
if (pHeartbeat->numOfStreams >= allocedStreamsNum) break;
}
/* pthread_mutex_unlock (&pObj->mutex); */
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SCMHeartBeatMsg);
pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
return pMsg;
return msgLen;
}
void tscKillConnection(STscObj *pObj) {
......
......@@ -93,7 +93,7 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo
static int32_t validateDNodeConfig(tDCLSQL* pOptions);
static int32_t validateLocalConfig(tDCLSQL* pOptions);
static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
......@@ -531,7 +531,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_KILL_QUERY:
case TSDB_SQL_KILL_STREAM:
case TSDB_SQL_KILL_CONNECTION: {
if ((code = setKillInfo(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
if ((code = setKillInfo(pSql, pInfo, pInfo->type)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1881,22 +1881,38 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
// functions can not be applied to normal columns
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (index.columnIndex < numOfCols) {
if (index.columnIndex < numOfCols && index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
index.columnIndex -= numOfCols;
if (index.columnIndex > 0) {
index.columnIndex -= numOfCols;
}
// 2. valid the column type
int16_t colType = pSchema[index.columnIndex].type;
if (colType == TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
int16_t colType = 0;
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
colType = TSDB_DATA_TYPE_BINARY;
} else {
colType = pSchema[index.columnIndex].type;
}
if (colType == TSDB_DATA_TYPE_BOOL) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
SSchema s = pTagSchema[index.columnIndex];
SSchema s = {0};
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
s.bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
s.type = TSDB_DATA_TYPE_BINARY;
s.colId = TSDB_TBNAME_COLUMN_INDEX;
} else {
s = pTagSchema[index.columnIndex];
}
int16_t bytes = 0;
int16_t type = 0;
int32_t inter = 0;
......@@ -2229,37 +2245,45 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg1 = "invalid ip address";
const char* msg2 = "invalid port";
int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
const char* msg1 = "invalid connection ID";
const char* msg2 = "invalid query ID";
const char* msg3 = "invalid stream ID";
SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type;
SSQLToken* ip = &(pInfo->pDCLInfo->ip);
if (ip->n > TSDB_KILL_MSG_LEN) {
SSQLToken* idStr = &(pInfo->pDCLInfo->ip);
if (idStr->n > TSDB_KILL_MSG_LEN) {
return TSDB_CODE_INVALID_SQL;
}
strncpy(pCmd->payload, ip->z, ip->n);
strncpy(pCmd->payload, idStr->z, idStr->n);
const char delim = ':';
char* connIdStr = strtok(idStr->z, &delim);
char* queryIdStr = strtok(NULL, &delim);
char* ipStr = strtok(ip->z, &delim);
char* portStr = strtok(NULL, &delim);
if (!validateIpAddress(ipStr, strlen(ipStr))) {
int32_t connId = (int32_t)strtol(connIdStr, NULL, 10);
if (connId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
uint16_t port = (uint16_t)strtol(portStr, NULL, 10);
if (port <= 0 || port > 65535) {
memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
if (killType == TSDB_SQL_KILL_CONNECTION) {
return TSDB_CODE_SUCCESS;
}
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
if (queryId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload));
if (killType == TSDB_SQL_KILL_QUERY) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->numOfBuffer = numOfFlush;
pReducer->numOfVnode = numOfBuffer;
pReducer->pDesc = pDesc;
tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
......@@ -278,6 +278,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
if (pReducer->pLoserTree == NULL || pRes->code != 0) {
......@@ -309,10 +310,10 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprList);
pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength;
assert(finalRowLength <= pReducer->rowSize);
pReducer->resColModel->capacity = pReducer->nResultBufSize / pReducer->finalRowSize;
assert(pReducer->finalRowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
// pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize);
......@@ -389,7 +390,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
assert(pPage->num <= pDesc->pColumnModel->capacity);
// sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderIdx.numOfCols > 0) {
if (pDesc->orderInfo.numOfCols > 0) {
tColDataQSort(pDesc, pPage->num, 0, pPage->num - 1, pPage->data, orderType);
}
......@@ -590,12 +591,10 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
// disable merge procedure for column projection query
int16_t functionId = pReducer->pCtx[0].functionId;
assert(functionId != TSDB_FUNC_ARITHM);
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pReducer->orderPrjOnSTable) {
return true;
}
......@@ -604,26 +603,33 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
tOrderDescriptor *pOrderDesc = pReducer->pDesc;
int32_t numOfCols = pOrderDesc->orderIdx.numOfCols;
SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
// no group by columns, all data belongs to one group
int32_t numOfCols = orderInfo->numOfCols;
if (numOfCols <= 0) {
return true;
}
if (pOrderDesc->orderIdx.pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { //<= 0
// super table interval query
if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
/*
* super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group
*/
assert(pQueryInfo->intervalTime > 0);
pOrderDesc->orderIdx.numOfCols -= 1;
if (numOfCols == 1) {
return true;
}
} else { // simple group by query
assert(pQueryInfo->intervalTime == 0);
}
// only one row exists
int32_t ret = compare_a(pOrderDesc, 1, 0, pPrev, 1, 0, tmpBuffer->data);
pOrderDesc->orderIdx.numOfCols = numOfCols;
return (ret == 0);
int32_t index = orderInfo->pData[0];
int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
return ret == 0;
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
......@@ -873,24 +879,24 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->pLocalReducer != pLocalReducer) {
/*
* Release the SSqlObj is called, and it is int destroying function invoked by other thread.
* However, the other thread will WAIT until current process fully completes.
* Since the flag of release struct is set by doLocalReduce function
*/
assert(pRes->pLocalReducer == NULL);
}
// if (pRes->pLocalReducer != pLocalReducer) {
// /*
// * Release the SSqlObj is called, and it is int destroying function invoked by other thread.
// * However, the other thread will WAIT until current process fully completes.
// * Since the flag of release struct is set by doLocalReduce function
// */
// assert(pRes->pLocalReducer == NULL);
// }
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
// no interval query, no fill operation
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num;
pRes->numOfClauseTotal += pRes->numOfRows;
......@@ -929,9 +935,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
}
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pFinalDataPage->num = 0;
return;
}
......@@ -1037,16 +1041,13 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
tVariantAssign(&pCtx->param[0], &pExpr->param[0]);
// tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
int32_t functionId = pExpr->functionId;
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
tVariantDestroy(&pCtx->tag);
char* input = pCtx->aInputElemBuf;
......@@ -1057,17 +1058,20 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
} else {
tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
}
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
}
pCtx->currentStage = SECONDARY_STAGE_MERGE;
if (needInit) {
aAggs[pExpr->functionId].init(pCtx);
aAggs[pCtx->functionId].init(pCtx);
}
}
for (int32_t j = 0; j < size; ++j) {
int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId;
int32_t functionId = pLocalReducer->pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
......@@ -1101,8 +1105,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
int32_t functionId = pExpr->functionId;
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue;
}
......@@ -1136,15 +1139,13 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
char *buf = malloc((size_t)maxBufSize);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (pExpr->functionId != TSDB_FUNC_TAG) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
if (pCtx->functionId != TSDB_FUNC_TAG) {
continue;
}
int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result
memset(buf, 0, (size_t)maxBufSize);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes);
for (int32_t i = 0; i < inc; ++i) {
......@@ -1160,8 +1161,8 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]);
SQLFunctionCtx* pCtx = &pLocalReducer->pCtx[k];
aAggs[pCtx->functionId].xFinalize(pCtx);
}
pLocalReducer->hasPrevRow = false;
......@@ -1182,13 +1183,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
*/
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
int16_t functionId = pLocalReducer->pCtx[0].functionId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
ret = 1; // disable merge procedure
} else {
tOrderDescriptor *pDesc = pLocalReducer->pDesc;
if (pDesc->orderIdx.numOfCols > 0) {
if (pDesc->orderInfo.numOfCols > 0) {
if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// todo refactor comparator
ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
......@@ -1274,7 +1275,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
return true;
}
......@@ -1341,7 +1342,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
// the first column must be the timestamp column
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false);
doFillResult(pSql, pLocalReducer, false);
}
return true;
......@@ -1374,7 +1375,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
pQueryInfo->slidingTimeUnit, tinfo.precision);
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true);
doFillResult(pSql, pLocalReducer, true);
}
}
......@@ -1408,13 +1409,11 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, k);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
// set the correct output timestamp column position
if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM) {
if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
}
}
......
......@@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pIpList->numOfIps > 0)
tscSetMgmtIpList(pIpList);
pSql->pTscObj->connId = htonl(pRsp->connId);
if (pRsp->killConnection) {
tscKillConnection(pObj);
} else {
......@@ -332,9 +334,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code;
tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
bool shouldFree = tscShouldBeFreed(pSql);
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
......@@ -1129,13 +1130,6 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMKillQueryMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
switch (pCmd->command) {
case TSDB_SQL_KILL_QUERY:
pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
......@@ -1743,57 +1737,43 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
int size = 0;
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
size += tsRpcHeadSize;
size += sizeof(SQqueryList);
pthread_mutex_lock(&pObj->mutex);
int32_t numOfQueries = 2;
SSqlObj *tpSql = pObj->sqlList;
while (tpSql) {
size += sizeof(SQueryDesc);
tpSql = tpSql->next;
numOfQueries++;
}
size += sizeof(SStreamList);
int32_t numOfStreams = 2;
SSqlStream *pStream = pObj->streamList;
while (pStream) {
size += sizeof(SStreamDesc);
pStream = pStream->next;
numOfStreams++;
}
return size + TSDB_EXTRA_PAYLOAD_SIZE;
}
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
int msgLen = 0;
int size = 0;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
pthread_mutex_lock(&pObj->mutex);
size = tscEstimateHeartBeatMsgLength(pSql);
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql);
return -1;
}
pMsg = pCmd->payload;
pStart = pMsg;
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
pHeartbeat->numOfQueries = numOfQueries;
pHeartbeat->numOfStreams = numOfStreams;
int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
pthread_mutex_unlock(&pObj->mutex);
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS;
}
......@@ -2204,6 +2184,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0;
......@@ -2330,7 +2311,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
}
pRes->row = 0;
tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
tscTrace("%p numOfRows:%d, offset:%d, complete:%d", pSql, pRes->numOfRows, pRes->offset, pRes->completed);
return 0;
}
......
......@@ -88,7 +88,6 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strncpy(pObj->user, user, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
pObj->mnodePort = port ? port : tsDnodeShellPort;
if (db) {
int32_t len = strlen(db);
......
......@@ -1557,8 +1557,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
// tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
// pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%s, orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
......@@ -1713,6 +1713,11 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
} else {
SSqlObj* pSql = (SSqlObj*) tres;
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
}
taos_free_result(tres);
......@@ -1947,7 +1952,8 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
size_t size = tscNumOfFields(pQueryInfo);
for (int i = 0; i < size; ++i) {
SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
......
......@@ -404,8 +404,6 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
pSql->numOfSubs = 0;
tscResetSqlCmdObj(pCmd);
tscTrace("%p partially free sqlObj completed", pSql);
}
void tscFreeSqlObj(SSqlObj* pSql) {
......@@ -2104,7 +2102,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
}
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, columnIndex);
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
......
......@@ -251,7 +251,7 @@ void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows);
// ----------------- K-V data row structure
/*
......
......@@ -450,7 +450,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
int iter1 = 0;
int iter2 = 0;
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
pTarget->numOfRows + rowsToMerge);
}
tdFreeDataCols(pTarget);
......@@ -461,15 +462,15 @@ _err:
return -1;
}
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
// TODO: add resolve duplicate key here
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
while (target->numOfRows < tRows) {
if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
if (*iter1 >= limit1 && *iter2 >= limit2) break;
TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
if (key1 <= key2) {
for (int i = 0; i < src1->numOfCols; i++) {
......
......@@ -112,7 +112,7 @@ int32_t tsMaxShellConns = 5000;
char tsDefaultDB[TSDB_DB_NAME_LEN] = {0};
char tsDefaultUser[64] = "root";
char tsDefaultPass[64] = "taosdata";
int32_t tsMaxConnections = 50;
int32_t tsMaxConnections = 5000;
int32_t tsBalanceInterval = 300; // seconds
int32_t tsOfflineThreshold = 86400*100; // seconds 10days
......@@ -840,7 +840,7 @@ static void doInitGlobalConfig() {
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1;
cfg.maxValue = 100;
cfg.maxValue = 100000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
......
......@@ -150,6 +150,7 @@ class CTaosInterface(object):
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
libtaos.taos_errno.restype = ctypes.c_int
libtaos.taos_query.restype = ctypes.POINTER(ctypes.c_void_p)
def __init__(self, config=None):
'''
......
......@@ -150,6 +150,7 @@ class CTaosInterface(object):
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
libtaos.taos_errno.restype = ctypes.c_int
libtaos.taos_query.restype = ctypes.POINTER(ctypes.c_void_p)
def __init__(self, config=None):
'''
......
......@@ -150,6 +150,7 @@ class CTaosInterface(object):
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
libtaos.taos_errno.restype = ctypes.c_int
libtaos.taos_query.restype = ctypes.POINTER(ctypes.c_void_p)
def __init__(self, config=None):
'''
......
......@@ -150,6 +150,7 @@ class CTaosInterface(object):
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
libtaos.taos_errno.restype = ctypes.c_int
libtaos.taos_query.restype = ctypes.POINTER(ctypes.c_void_p)
def __init__(self, config=None):
'''
......
......@@ -46,7 +46,7 @@ typedef struct {
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent SDnodeComponents[] = {
static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
......@@ -72,14 +72,14 @@ static int dnodeCreateDir(const char *dir) {
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
SDnodeComponents[i].cleanup();
tsDnodeComponents[i].cleanup();
}
}
static int32_t dnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) {
if (SDnodeComponents[i].init() != 0) {
for (int32_t i = 0; i < sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]); i++) {
if (tsDnodeComponents[i].init() != 0) {
dnodeCleanupComponents(i);
code = -1;
break;
......@@ -133,7 +133,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1);
dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1);
taos_cleanup();
taosCloseLog();
}
......
......@@ -231,7 +231,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SHOW_SQL_LEN 64
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1
......
......@@ -150,6 +150,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_SHELL_CONNS, 0, 0x0409, "too many shell conns")
// client
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version")
......
......@@ -137,6 +137,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_SCORES,
TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_STREAMTABLES,
TSDB_MGMT_TABLE_MAX,
};
......@@ -299,6 +300,9 @@ typedef struct {
char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth;
int8_t superAuth;
int8_t reserved1;
int8_t reserved2;
int32_t connId;
SRpcIpSet ipList;
} SCMConnectRsp;
......@@ -716,16 +720,10 @@ typedef struct {
} SStreamDesc;
typedef struct {
int32_t numOfQueries;
} SQqueryList;
typedef struct {
int32_t numOfStreams;
} SStreamList;
typedef struct {
SQqueryList qlist;
SStreamList slist;
uint32_t connId;
int32_t numOfQueries;
int32_t numOfStreams;
char pData[];
} SCMHeartBeatMsg;
typedef struct {
......@@ -733,6 +731,7 @@ typedef struct {
uint32_t streamId;
uint32_t totalDnodes;
uint32_t onlineDnodes;
uint32_t connId;
int8_t killConnection;
SRpcIpSet ipList;
} SCMHeartBeatRsp;
......
......@@ -188,9 +188,10 @@ typedef void *TsdbPosT;
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo);
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
......@@ -202,11 +203,11 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
* @param groupInfo tableId list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo);
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo);
/**
* move to next block if exists
......
......@@ -148,8 +148,8 @@
#define TK_SET 130
#define TK_KILL 131
#define TK_CONNECTION 132
#define TK_COLON 133
#define TK_STREAM 134
#define TK_STREAM 133
#define TK_COLON 134
#define TK_ABORT 135
#define TK_AFTER 136
#define TK_ATTACH 137
......
......@@ -182,8 +182,6 @@ typedef struct SUserObj {
int8_t updateEnd[1];
int32_t refCount;
struct SAcctObj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
} SUserObj;
typedef struct {
......
......@@ -21,9 +21,30 @@ extern "C" {
#endif
#include "mnodeDef.h"
typedef struct {
char user[TSDB_USER_LEN + 1];
int8_t killed;
uint16_t port;
uint32_t ip;
uint32_t connId;
uint64_t stime;
uint64_t lastAccess;
uint32_t queryId;
uint32_t streamId;
int32_t numOfQueries;
int32_t numOfStreams;
SStreamDesc *pStreams;
SQueryDesc * pQueries;
} SConnObj;
int32_t mnodeInitProfile();
void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
#ifdef __cplusplus
}
......
......@@ -33,14 +33,54 @@
#include "mnodeUser.h"
#include "mnodeTable.h"
#include "mnodeShow.h"
#include "mnodeProfile.h"
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
} SMnodeComponent;
void *tsMnodeTmr;
static bool tsMgmtIsRunning = false;
static const SMnodeComponent tsMnodeComponents[] = {
{"profile", mnodeInitProfile, mnodeCleanupProfile},
{"accts", mnodeInitAccts, mnodeCleanupAccts},
{"users", mnodeInitUsers, mnodeCleanupUsers},
{"dnodes", mnodeInitDnodes, mnodeCleanupDnodes},
{"dbs", mnodeInitDbs, mnodeCleanupDbs},
{"vgroups", mnodeInitVgroups, mnodeCleanupVgroups},
{"tables", mnodeInitTables, mnodeCleanupTables},
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes},
{"sdb", sdbInit, sdbCleanUp},
{"balance", balanceInit, balanceCleanUp},
{"grant", grantInit, grantCleanUp},
{"show", mnodeInitShow, mnodeCleanUpShow}
};
static void mnodeInitTimer();
static void mnodeCleanupTimer();
static bool mnodeNeedStart() ;
static void mnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
tsMnodeComponents[i].cleanup();
}
}
static int32_t mnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) {
if (tsMnodeComponents[i].init() != 0) {
mnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
}
int32_t mnodeStartSystem() {
if (tsMgmtIsRunning) {
mPrint("mnode module already started...");
......@@ -57,57 +97,7 @@ int32_t mnodeStartSystem() {
dnodeAllocateMnodeRqueue();
dnodeAllocateMnodePqueue();
if (mnodeInitAccts() < 0) {
mError("failed to init accts");
return -1;
}
if (mnodeInitUsers() < 0) {
mError("failed to init users");
return -1;
}
if (mnodeInitDnodes() < 0) {
mError("failed to init dnodes");
return -1;
}
if (mnodeInitDbs() < 0) {
mError("failed to init dbs");
return -1;
}
if (mnodeInitVgroups() < 0) {
mError("failed to init vgroups");
return -1;
}
if (mnodeInitTables() < 0) {
mError("failed to init tables");
return -1;
}
if (mnodeInitMnodes() < 0) {
mError("failed to init mnodes");
return -1;
}
if (sdbInit() < 0) {
mError("failed to init sdb");
return -1;
}
if (balanceInit() < 0) {
mError("failed to init balance")
}
if (grantInit() < 0) {
mError("failed to init grant");
return -1;
}
if (mnodeInitShow() < 0) {
mError("failed to init show");
if (mnodeInitComponents() != 0) {
return -1;
}
......@@ -115,7 +105,6 @@ int32_t mnodeStartSystem() {
tsMgmtIsRunning = true;
mPrint("mnode is initialized successfully");
return 0;
}
......@@ -133,17 +122,8 @@ void mnodeCleanupSystem() {
dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue();
mnodeCleanupTimer();
mnodeCleanUpShow();
grantCleanUp();
balanceCleanUp();
sdbCleanUp();
mnodeCleanupMnodes();
mnodeCleanupTables();
mnodeCleanupVgroups();
mnodeCleanupDbs();
mnodeCleanupDnodes();
mnodeCleanupUsers();
mnodeCleanupAccts();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mPrint("mnode is cleaned up");
}
......
此差异已折叠。
......@@ -227,21 +227,47 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
SRpcConnInfo connInfo;
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
}
if (pConn == NULL) {
// do not close existing links, otherwise
// mError("failed to create connId, close connect");
// pHBRsp->killConnection = 1;
} else {
pHBRsp->connId = htonl(pConn->connId);
mnodeSaveQueryStreamList(pConn, pHBMsg);
if (pConn->killed != 0) {
pHBRsp->killConnection = 1;
}
if (pConn->streamId != 0) {
pHBRsp->streamId = htonl(pConn->streamId);
pConn->streamId = 0;
}
if (pConn->queryId != 0) {
pHBRsp->queryId = htonl(pConn->queryId);
pConn->queryId = 0;
}
}
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeIpSetForShell(&pHBRsp->ipList);
/*
* TODO
* Dispose kill stream or kill query message
*/
pHBRsp->queryId = 0;
pHBRsp->streamId = 0;
pHBRsp->killConnection = 0;
pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
mnodeReleaseConn(pConn);
return TSDB_CODE_SUCCESS;
}
......@@ -281,11 +307,19 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto connect_over;
}
SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
code = terrno;
} else {
pConnectRsp->connId = htonl(pConn->connId);
mnodeReleaseConn(pConn);
}
sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
strcpy(pConnectRsp->serverVersion, version);
pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth;
mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList);
connect_over:
......@@ -358,3 +392,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) {
mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove);
}
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
\ No newline at end of file
......@@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
......@@ -568,8 +568,8 @@ int32_t mnodeInitTables() {
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeGetStreamTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeRetrieveStreamTables);
return TSDB_CODE_SUCCESS;
}
......@@ -1284,7 +1284,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
char * stableName = (char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN)*i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) {
mError("stable:%s, not exist while get stable vgroup info", stableName);
......@@ -1294,41 +1294,48 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
if (pTable->vgHash == NULL) {
mError("stable:%s, not vgroup exist while get stable vgroup info", stableName);
mnodeDecTableRef(pTable);
continue;
}
SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg;
// even this super table has no corresponding table, still return
pRsp->numOfTables++;
SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash);
int32_t vgSize = 0;
while (taosHashIterNext(pIter)) {
int32_t *pVgId = taosHashIterGet(pIter);
SVgObj * pVgroup = mnodeGetVgroup(*pVgId);
if (pVgroup == NULL) continue;
SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg;
pVgroupInfo->numOfVgroups = 0;
msg += sizeof(SVgroupsInfo);
} else {
SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg;
pVgroupInfo->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
if (pDnode == NULL) break;
SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash);
int32_t vgSize = 0;
while (taosHashIterNext(pIter)) {
int32_t *pVgId = taosHashIterGet(pIter);
SVgObj * pVgroup = mnodeGetVgroup(*pVgId);
if (pVgroup == NULL) continue;
strncpy(pVgroupInfo->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn));
pVgroupInfo->vgroups[vgSize].ipAddr[vn].port = htons(pDnode->dnodePort);
pVgroupInfo->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
if (pDnode == NULL) break;
pVgroupInfo->vgroups[vgSize].numOfIps++;
}
strncpy(pVgroupInfo->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn));
pVgroupInfo->vgroups[vgSize].ipAddr[vn].port = htons(pDnode->dnodePort);
vgSize++;
mnodeDecVgroupRef(pVgroup);
}
pVgroupInfo->vgroups[vgSize].numOfIps++;
}
taosHashDestroyIter(pIter);
mnodeDecTableRef(pTable);
vgSize++;
mnodeDecVgroupRef(pVgroup);
}
taosHashDestroyIter(pIter);
mnodeDecTableRef(pTable);
pVgroupInfo->numOfVgroups = htonl(vgSize);
pVgroupInfo->numOfVgroups = htonl(vgSize);
// one table is done, try the next table
msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo);
pRsp->numOfTables++;
// one table is done, try the next table
msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo);
pRsp->numOfTables++;
}
}
if (pRsp->numOfTables != numOfTable) {
......@@ -2111,14 +2118,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
return 0;
}
static void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
......@@ -2262,7 +2261,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
return code;
}
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
......@@ -2308,7 +2307,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
return 0;
}
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
......
......@@ -183,7 +183,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SHOW_SQL_LEN);
tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, "
......
......@@ -172,10 +172,11 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostInfo summary;
bool stableQuery; // super table query or not
bool stableQuery; // super table query or not
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool topBotQuery; // false;
} SQueryRuntimeEnv;
typedef struct SQInfo {
......
......@@ -31,7 +31,8 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo);
#define curTimeWindow(_winres) ((_winres)->curIndex)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
......
......@@ -28,7 +28,7 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*56) // 16k larger than the SHistoInfo
#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
......@@ -96,7 +96,7 @@ typedef struct SColumnOrderInfo {
typedef struct tOrderDescriptor {
SColumnModel * pColumnModel;
int32_t tsOrder; // timestamp order type if exists
SColumnOrderInfo orderIdx;
SColumnOrderInfo orderInfo;
} tOrderDescriptor;
typedef struct tExtMemBuffer {
......
......@@ -85,7 +85,7 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id
* @return
*/
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id);
#define GET_RES_BUF_PAGE_BY_ID(buf, id) ((tFilePage*)((buf)->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE*(id)))
/**
* get the total buffer size in the format of disk file
......
......@@ -647,9 +647,9 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
}
////////////////////////////////////////kill statement///////////////////////////////////////
cmd ::= KILL CONNECTION IPTOKEN(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &X);}
cmd ::= KILL STREAM IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);}
cmd ::= KILL CONNECTION INTEGER(Y). {setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &Y);}
cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
......
......@@ -272,9 +272,18 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo);
void initResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
}
#ifdef __cplusplus
}
#endif
......
此差异已折叠。
......@@ -206,11 +206,6 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->status.closed == true);
}
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
return pWindowResInfo->curIndex;
}
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
getWindowResult(pWindowResInfo, slot)->status.closed = true;
}
......
......@@ -356,17 +356,15 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
int32_t first = *(int32_t *)f1;
int32_t second = *(int32_t *)f2;
int32_t first = *(int32_t *) f1;
int32_t second = *(int32_t *) f2;
if (first == second) {
return 0;
}
return (first < second) ? -1 : 1;
};
case TSDB_DATA_TYPE_DOUBLE: {
//double first = *(double *)f1;
double first = GET_DOUBLE_VAL(f1);
//double second = *(double *)f2;
double first = GET_DOUBLE_VAL(f1);
double second = GET_DOUBLE_VAL(f2);
if (first == second) {
return 0;
......@@ -374,9 +372,7 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i
return (first < second) ? -1 : 1;
};
case TSDB_DATA_TYPE_FLOAT: {
//float first = *(float *)f1;
//float second = *(float *)f2;
float first = GET_FLOAT_VAL(f1);
float first = GET_FLOAT_VAL(f1);
float second = GET_FLOAT_VAL(f2);
if (first == second) {
return 0;
......@@ -439,9 +435,9 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2);
int32_t cmpCnt = pDescriptor->orderIdx.numOfCols;
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderIdx.pData[i];
int32_t colIdx = pDescriptor->orderInfo.pData[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
......@@ -471,9 +467,9 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2);
int32_t cmpCnt = pDescriptor->orderIdx.numOfCols;
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderIdx.pData[i];
int32_t colIdx = pDescriptor->orderInfo.pData[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
......@@ -563,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
int32_t midIdx = ((end - start) >> 1) + start;
#if defined(_DEBUG_VIEW)
int32_t f = pDescriptor->orderIdx.pData[0];
int32_t f = pDescriptor->orderInfo.pData[0];
char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f);
char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f);
char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
int32_t colIdx = pDescriptor->orderIdx.pData[0];
int32_t colIdx = pDescriptor->orderInfo.pData[0];
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx);
#endif
......@@ -596,7 +592,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
}
static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) {
int32_t colIdx = pDescriptor->orderIdx.pData[0];
int32_t colIdx = pDescriptor->orderInfo.pData[0];
for (int32_t i = 0; i < len; ++i) {
char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx);
......@@ -1062,9 +1058,9 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
desc->pColumnModel = pModel;
desc->tsOrder = tsOrderType;
desc->orderIdx.numOfCols = numOfOrderCols;
desc->orderInfo.numOfCols = numOfOrderCols;
for (int32_t i = 0; i < numOfOrderCols; ++i) {
desc->orderIdx.pData[i] = orderColIdx[i];
desc->orderInfo.pData[i] = orderColIdx[i];
}
return desc;
......
......@@ -50,12 +50,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
return TSDB_CODE_SUCCESS;
}
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(id < pResultBuf->numOfPages && id >= 0);
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE * id);
}
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
......@@ -169,7 +163,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
*pageId = (pResultBuf->allocateId++);
registerPageId(pResultBuf, groupId, *pageId);
tFilePage* page = getResultBufferPageById(pResultBuf, *pageId);
tFilePage* page = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pageId);
// clear memory for the new page
memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE);
......
此差异已折叠。
......@@ -127,7 +127,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
hash = rpcHashConn(pCache, fqdn, port, connType);
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
strcpy(pNode->fqdn, fqdn);
tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn));
pNode->port = port;
pNode->connType = connType;
pNode->data = data;
......
......@@ -60,7 +60,7 @@ typedef struct {
void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer
void *hash; // handle returned by hash utility
SHashObj *hash; // handle returned by hash utility
void *tcphandle;// returned handle from TCP initialization
void *udphandle;// returned handle from UDP initialization
void *pCache; // connection cache
......@@ -211,7 +211,7 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
if(pInit->label) strcpy(pRpc->label, pInit->label);
if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
......@@ -228,7 +228,7 @@ void *rpcOpen(const SRpcInit *pInit) {
size_t size = sizeof(SRpcConn) * pRpc->sessions;
pRpc->connList = (SRpcConn *)calloc(1, size);
if (pRpc->connList == NULL) {
tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size);
tError("%s failed to allocate memory for taos connections, size:%ld", pRpc->label, size);
rpcClose(pRpc);
return NULL;
}
......@@ -459,7 +459,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
pInfo->clientPort = pConn->peerPort;
// pInfo->serverIp = pConn->destIp;
strcpy(pInfo->user, pConn->user);
strncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
return 0;
}
......@@ -503,10 +503,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
pConn = rpcAllocateClientConn(pRpc);
if (pConn) {
strcpy(pConn->peerFqdn, peerFqdn);
tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
pConn->peerIp = peerIp;
pConn->peerPort = peerPort;
strcpy(pConn->user, pRpc->user);
tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
pConn->connType = connType;
if (taosOpenConn[connType]) {
......@@ -804,7 +804,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn = rpcGetConnObj(pRpc, sid, pRecv);
if (pConn == NULL) {
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno));
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
return NULL;
} else {
if (rpcIsReq(pHead->msgType)) {
......
......@@ -73,7 +73,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
pServerObj->ip = ip;
pServerObj->port = port;
strcpy(pServerObj->label, label);
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
......@@ -87,7 +87,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->processData = fp;
strcpy(pThreadObj->label, label);
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle;
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
......@@ -247,7 +247,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
memset(pThreadObj, 0, sizeof(SThreadObj));
strcpy(pThreadObj->label, label);
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->ip = ip;
pThreadObj->shandle = shandle;
......
......@@ -72,7 +72,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pSet->port = port;
pSet->shandle = shandle;
pSet->fp = fp;
strcpy(pSet->label, label);
tstrncpy(pSet->label, label, sizeof(pSet->label));
uint16_t ownPort;
for (int i = 0; i < threads; ++i) {
......@@ -99,7 +99,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
}
strcpy(pConn->label, label);
tstrncpy(pConn->label, label, sizeof(pConn->label));
pConn->shandle = shandle;
pConn->processData = fp;
pConn->index = i;
......
......@@ -76,6 +76,7 @@ int main(int argc, char *argv[]) {
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[TSDB_KEY_LEN] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
......@@ -97,7 +98,7 @@ int main(int argc, char *argv[]) {
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = "mypassword";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
......@@ -106,7 +107,7 @@ int main(int argc, char *argv[]) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn));
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0]));
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
......
......@@ -77,6 +77,7 @@ int main(int argc, char *argv[]) {
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[TSDB_KEY_LEN] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
......@@ -98,7 +99,7 @@ int main(int argc, char *argv[]) {
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = "mypassword";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
......
......@@ -977,7 +977,8 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// tdResetDataCols(pHelper->pDataCols[1]);
while (true) {
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5);
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
......@@ -989,54 +990,6 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
}
round++;
blkIdx++;
// TODO: the blkIdx here is not correct
// if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) {
// if (pHelper->pDataCols[1]->numOfRows > 0) {
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
// pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
// goto _err;
// // TODO: the blkIdx here is not correct
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows);
// }
// }
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows
// ? INT64_MAX
// : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
// TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
// if (key1 < key2) {
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
// ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
// TYPE_BYTES[pDataCol->type]);
// }
// pHelper->pDataCols[1]->numOfRows++;
// iter1++;
// } else if (key1 == key2) {
// // TODO: think about duplicate key cases
// ASSERT(false);
// } else {
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
// ((char *)pDataCols->cols[i].pData +
// TYPE_BYTES[pDataCol->type] * iter2),
// TYPE_BYTES[pDataCol->type]);
// }
// pHelper->pDataCols[1]->numOfRows++;
// iter2++;
// }
// if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err;
// // TODO: blkIdx here is not correct, fix it
// tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
// tdResetDataCols(pHelper->pDataCols[1]);
// }
}
}
}
......
......@@ -17,6 +17,7 @@
#include "tulog.h"
#include "talgo.h"
#include "tutil.h"
#include "ttime.h"
#include "tcompare.h"
#include "exception.h"
......@@ -71,6 +72,8 @@ typedef struct STableCheckInfo {
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator
......@@ -79,8 +82,6 @@ typedef struct STableCheckInfo {
typedef struct STableBlockInfo {
SCompBlock* compBlock;
STableCheckInfo* pTableCheckInfo;
// int32_t blockIndex;
// int32_t groupIdx; /* number of group is less than the total number of tables */
} STableBlockInfo;
typedef struct SBlockOrderSupporter {
......@@ -120,7 +121,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
......@@ -134,7 +135,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileId = -1;
}
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) {
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
......@@ -147,6 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
......@@ -201,8 +203,8 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
return (TsdbQueryHandleT) pQueryHandle;
}
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
pQueryHandle->order = TSDB_ORDER_DESC;
......@@ -227,8 +229,8 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
return res;
}
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
......@@ -303,6 +305,83 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
return true;
}
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
SDataRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = SL_GET_NODE_DATA(node);
}
}
if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = SL_GET_NODE_DATA(node);
}
}
if (rmem != NULL && rimem != NULL) {
if (dataRowKey(rmem) < dataRowKey(rimem)) {
pCheckInfo->chosen = 0;
return rmem;
} else if (dataRowKey(rmem) == dataRowKey(rimem)) {
// data ts are duplicated, ignore the data in mem
tSkipListIterNext(pCheckInfo->iter);
pCheckInfo->chosen = 1;
return rimem;
} else {
pCheckInfo->chosen = 1;
return rimem;
}
}
if (rmem != NULL) {
pCheckInfo->chosen = 0;
return rmem;
}
if (rimem != NULL) {
pCheckInfo->chosen = 1;
return rimem;
}
return NULL;
}
bool moveToNextRow(STableCheckInfo* pCheckInfo) {
bool hasNext = false;
if (pCheckInfo->chosen == 0) {
if (pCheckInfo->iter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
}
} else {
if (pCheckInfo->chosen == 1) {
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iter != NULL) {
return tSkipListIterGet(pCheckInfo->iter) != NULL;
}
}
}
return hasNext;
}
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
......@@ -312,31 +391,13 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL);
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false;
}
if (pCheckInfo->iter == NULL && pTable->mem) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
if (pCheckInfo->iter == NULL) {
return false;
}
if (!tSkipListIterNext(pCheckInfo->iter)) { // buffer is empty
return false;
}
}
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node == NULL) {
initTableMemIterator(pHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
return false;
}
SDataRow row = SL_GET_NODE_DATA(node);
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
......@@ -349,7 +410,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
STimeWindow* win = &pHandle->cur.win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value
......@@ -382,7 +443,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
return fid;
}
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1;
......@@ -448,7 +509,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) {
......@@ -522,7 +583,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
......@@ -540,9 +603,15 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
blockLoaded = true;
}
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0);
taosArrayDestroy(sa);
tfree(data);
int64_t et = taosGetTimestampUs() - st;
tsdbTrace("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
return blockLoaded;
}
......@@ -600,7 +669,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, binfo.window.skey - step,
cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
pQueryHandle->realNumOfRows = cur->rows;
......@@ -649,7 +718,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
......@@ -1155,7 +1224,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
*numOfAllocBlocks = numOfBlocks;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SBlockOrderSupporter sup = {0};
sup.numOfTables = numOfTables;
sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
......@@ -1192,15 +1261,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
pBlockInfo->compBlock = &pBlock[k];
pBlockInfo->pTableCheckInfo = pTableCheck;
// pBlockInfo->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index
// pBlockInfo->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table
cnt++;
}
numOfQualTables++;
}
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
assert(numOfBlocks == cnt);
// since there is only one table qualified, blocks are not sorted
if (numOfQualTables == 1) {
memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
cleanBlockOrderSupporter(&sup, numOfQualTables);
tsdbTrace("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
pQueryHandle->qinfo);
return TSDB_CODE_SUCCESS;
}
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
numOfQualTables, pQueryHandle->qinfo);
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables;
......@@ -1257,8 +1337,8 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
break;
}
tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId);
tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
assert(numOfBlocks >= 0);
if (numOfBlocks == 0) {
......@@ -1565,19 +1645,22 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
*skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs();
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
int32_t numOfTableCols = schemaNCols(pSchema);
do {
SSkipListNode* node = tSkipListIterGet(pIter);
if (node == NULL) {
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
break;
}
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
......@@ -1590,49 +1673,69 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
}
if (*skey == INT64_MIN) {
*skey = dataRowKey(row);
*skey = key;
}
*ekey = dataRowKey(row);
int32_t offset = -1;
*ekey = key;
char* pData = NULL;
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pTable);
int32_t numOfTableCols = schemaNCols(pSchema);
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfTableCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
for(int32_t j = 0; j < numOfTableCols; ++j) {
if (pColInfo->info.colId == pSchema->columns[j].colId) {
offset = pSchema->columns[j].offset;
break;
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
assert(offset != -1); // todo handle error
void *value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
}
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
setVardataNull(pData, pColInfo->info.type);
} else {
memcpy(pData, value, pColInfo->info.bytes);
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
if (++numOfRows >= maxRowsToRead) {
tSkipListIterNext(pIter);
moveToNextRow(pCheckInfo);
break;
}
} while(tSkipListIterNext(pIter));
} while(moveToNextRow(pCheckInfo));
assert(numOfRows <= maxRowsToRead);
......@@ -1646,6 +1749,10 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
}
}
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbTrace("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols);
return numOfRows;
}
......
......@@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
*/
void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key);
/**
* update the expire time of data in cache
* @param pCacheObj cache object
* @param key key
* @param expireTime new expire time of data
* @return
*/
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
......
......@@ -153,6 +153,8 @@ bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len,
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes);
void taosRandStr(char* str, int32_t size);
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosValidateEncodec(const char *encodec);
......
......@@ -92,12 +92,14 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
}
void* taosArrayGetP(const SArray* pArray, size_t index) {
void* ret = taosArrayGet(pArray, index);
if (ret == NULL) {
assert(index < pArray->size);
void* d = TARRAY_GET_ELEM(pArray, index);
if (d == NULL) {
return NULL;
}
return *(void**)ret;
return *(void**)d;
}
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
......
......@@ -488,6 +488,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL;
}
uint32_t keyLen = (uint32_t)strlen(key);
__cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
if (ptNode != NULL) {
T_REF_INC(*ptNode);
(*ptNode)->expiredTime = expireTime;
}
__cache_unlock(pCacheObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uTrace("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uTrace("key:%s not in cache, retrieved failed", key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL;
......
......@@ -19,6 +19,7 @@
#include "tutil.h"
int taosGetFqdn(char *fqdn) {
int code = 0;
char hostname[1024];
hostname[1023] = '\0';
gethostname(hostname, 1023);
......@@ -27,13 +28,15 @@ int taosGetFqdn(char *fqdn) {
h = gethostbyname(hostname);
if (h != NULL) {
strcpy(fqdn, h->h_name);
return 0;
} else {
uError("failed to get host name");
return -1;
uError("failed to get host name(%s)", strerror(errno));
code = -1;
}
free(h);
// to do: free the resources
// free(h);
return code;
}
uint32_t taosGetIpFromFqdn(const char *fqdn) {
......@@ -47,7 +50,7 @@ uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
strcpy(ip_addr_cpy, ip_addr);
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
......@@ -206,7 +209,7 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
int reuse, nocheck;
int bufSize = 8192000;
uTrace("open udp socket:%s:%hu", ip, port);
uTrace("open udp socket:0x%x:%hu", ip, port);
memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
......@@ -257,7 +260,7 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
/* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
uError("failed to bind udp socket: %d (%s), %s:%hu", errno, strerror(errno), ip, port);
uError("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(sockFd);
return -1;
}
......@@ -363,7 +366,7 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
int sockFd;
int reuse;
uTrace("open tcp server socket:%s:%hu", ip, port);
uTrace("open tcp server socket:0x%x:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET;
......
......@@ -27,8 +27,6 @@
#include "tulog.h"
#include "taoserror.h"
int32_t tmpFileSerialNum = 0;
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......@@ -433,12 +431,24 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#else
char *tmpDir = "/tmp/";
#endif
int64_t ts = taosGetTimestampUs();
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%d-%"PRIu64"-%u-%"PRIu64);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1), ts);
strcat(tmpPath, "-%d-%s");
char rand[8] = {0};
taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
}
void taosRandStr(char* str, int32_t size) {
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789-_.";
int32_t len = 39;
for(int32_t i = 0; i < size; ++i) {
str[i] = set[rand()%len];
}
}
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) {
......
......@@ -68,11 +68,19 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
pWal->num = 0;
pWal->level = pCfg->walLevel;
pWal->keep = pCfg->keep;
strcpy(pWal->path, path);
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
if (access(path, F_OK) != 0) mkdir(path, 0755);
if (access(path, F_OK) != 0) {
if (mkdir(path, 0755) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
pthread_mutex_destroy(&pWal->mutex);
free(pWal);
pWal = NULL;
}
}
if (pCfg->keep == 1) return pWal;
if (walHandleExistingFiles(path) == 0)
......@@ -80,7 +88,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
if (pWal->fd <0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal:%s, failed to open", path);
wError("wal:%s, failed to open(%s)", path, strerror(errno));
pthread_mutex_destroy(&pWal->mutex);
free(pWal);
pWal = NULL;
......@@ -119,7 +127,8 @@ void walClose(void *handle) {
int walRenew(void *handle) {
if (handle == NULL) return 0;
SWal *pWal = handle;
int code = 0;
terrno = 0;
pthread_mutex_lock(&pWal->mutex);
......@@ -135,8 +144,8 @@ int walRenew(void *handle) {
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
code = -1;
wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
wTrace("wal:%s, it is created", pWal->name);
......@@ -156,14 +165,15 @@ int walRenew(void *handle) {
pthread_mutex_unlock(&pWal->mutex);
return code;
return terrno;
}
int walWrite(void *handle, SWalHead *pHead) {
SWal *pWal = handle;
int code = 0;
if (pWal == NULL) return -1;
terrno = 0;
// no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0;
......@@ -174,12 +184,12 @@ int walWrite(void *handle, SWalHead *pHead) {
if(write(pWal->fd, pHead, contLen) != contLen) {
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
code = -1;
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
pWal->version = pHead->version;
}
return code;
return terrno;
}
void walFsync(void *handle) {
......@@ -196,11 +206,11 @@ void walFsync(void *handle) {
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
SWal *pWal = handle;
int code = 0;
struct dirent *ent;
int count = 0;
uint32_t maxId = 0, minId = -1, index =0;
terrno = 0;
int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN+5];
......@@ -224,30 +234,30 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
closedir(dir);
if (count == 0) {
if (pWal->keep) code = walRenew(pWal);
return code;
if (pWal->keep) terrno = walRenew(pWal);
return terrno;
}
if ( count != (maxId-minId+1) ) {
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
code = -1;
terrno = TAOS_SYSTEM_ERROR(TSDB_CODE_APP_ERROR);
} else {
wTrace("wal:%s, %d files will be restored", opath, count);
for (index = minId; index<=maxId; ++index) {
sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index);
code = walRestoreWalFile(pWal, pVnode, writeFp);
if (code < 0) break;
terrno = walRestoreWalFile(pWal, pVnode, writeFp);
if (terrno < 0) break;
}
}
if (code == 0) {
if (terrno == 0) {
if (pWal->keep == 0) {
code = walRemoveWalFiles(opath);
if (code == 0) {
terrno = walRemoveWalFiles(opath);
if (terrno == 0) {
if (remove(opath) < 0) {
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
code = -1;
terrno = TAOS_SYSTEM_ERROR(errno);
}
}
} else {
......@@ -258,12 +268,12 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
code = -1;
terrno = TAOS_SYSTEM_ERROR(errno);
}
}
}
return code;
return terrno;
}
int walGetWalFile(void *handle, char *name, uint32_t *index) {
......@@ -292,40 +302,47 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
}
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
int code = 0;
char *name = pWal->name;
terrno = 0;
char *buffer = malloc(1024000); // size for one record
if (buffer == NULL) return -1;
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
SWalHead *pHead = (SWalHead *)buffer;
int fd = open(name, O_RDONLY);
if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
free(buffer);
return -1;
return terrno;
}
wTrace("wal:%s, start to restore", name);
while (1) {
int ret = read(fd, pHead, sizeof(SWalHead));
if ( ret == 0) { code = 0; break;}
if ( ret == 0) break;
if (ret != sizeof(SWalHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
ret = read(fd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
......@@ -336,11 +353,10 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
close(fd);
free(buffer);
return code;
return terrno;
}
int walHandleExistingFiles(const char *path) {
int code = 0;
char oname[TSDB_FILENAME_LEN * 3];
char nname[TSDB_FILENAME_LEN * 3];
char opath[TSDB_FILENAME_LEN];
......@@ -350,6 +366,7 @@ int walHandleExistingFiles(const char *path) {
struct dirent *ent;
DIR *dir = opendir(path);
int plen = strlen(walPrefix);
terrno = 0;
if (access(opath, F_OK) == 0) {
// old directory is there, it means restore process is not finished
......@@ -360,13 +377,19 @@ int walHandleExistingFiles(const char *path) {
int count = 0;
while ((ent = readdir(dir))!= NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
if (access(opath, F_OK) != 0) mkdir(opath, 0755);
sprintf(oname, "%s/%s", path, ent->d_name);
sprintf(nname, "%s/old/%s", path, ent->d_name);
if (access(opath, F_OK) != 0) {
if (mkdir(opath, 0755) != 0) {
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
}
if (rename(oname, nname) < 0) {
wError("wal:%s, failed to move to new:%s", oname, nname);
code = -1;
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
......@@ -378,14 +401,14 @@ int walHandleExistingFiles(const char *path) {
}
closedir(dir);
return code;
return terrno;
}
static int walRemoveWalFiles(const char *path) {
int plen = strlen(walPrefix);
char name[TSDB_FILENAME_LEN * 3];
int code = 0;
terrno = 0;
if (access(path, F_OK) != 0) return 0;
struct dirent *ent;
......@@ -396,13 +419,13 @@ static int walRemoveWalFiles(const char *path) {
sprintf(name, "%s/%s", path, ent->d_name);
if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
code = -1; break;
terrno = TAOS_SYSTEM_ERROR(errno);
}
}
}
closedir(dir);
return code;
return terrno;
}
......@@ -87,7 +87,7 @@ class TDTestCase:
# <> for timestamp type
tdSql.query("select * from db.st where ts <> '2020-05-13 10:00:00.002'")
#tdSql.checkRows(4)
# tdSql.checkRows(4)
# <> for numeric type
tdSql.query("select * from db.st where tagtype <> 2")
......@@ -96,7 +96,7 @@ class TDTestCase:
# <> for nchar type
tdSql.query("select * from db.st where name <> 'first'")
tdSql.checkRows(4)
# % for nchar type
tdSql.query("select * from db.st where name like 'fi%'")
tdSql.checkRows(2)
......
......@@ -42,14 +42,17 @@ class TDTestCase:
('2020-05-13 10:00:00.005', 3, 'third')""")
# query with filter condition A OR condition B
tdSql.query("select * from db.st where ts > '2020-05-13 10:00:00.002' AND tagtype < 2")
tdSql.query(
"select * from db.st where ts > '2020-05-13 10:00:00.002' AND tagtype < 2")
tdSql.checkRows(1)
# query with filter condition A OR condition B, error expected
tdSql.error("select * from db.st where ts > '2020-05-13 10:00:00.002' OR tagtype < 2")
tdSql.error(
"select * from db.st where ts > '2020-05-13 10:00:00.002' OR tagtype < 2")
# illegal condition
tdSql.error("select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2")
tdSql.error(
"select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2")
tdSql.error("select * from db.st where tagtype <> 1 OR tagtype < 2")
def stop(self):
......
......@@ -41,18 +41,17 @@ class TDTestCase:
('2020-05-13 10:00:00.002', 3, 'third') dev_002 VALUES('2020-05-13 10:00:00.003', 1, 'first'), ('2020-05-13 10:00:00.004', 2, 'second'),
('2020-05-13 10:00:00.005', 3, 'third')""")
# query first .. as ..
tdSql.error("select first(*) as one from st")
tdSql.error("select first(*) as one from st")
# query last .. as ..
tdSql.error("select last(*) as latest from st")
tdSql.error("select last(*) as latest from st")
# query last row .. as ..
tdSql.error("select last_row as latest from st")
tdSql.error("select last_row as latest from st")
# query distinct on normal colnum
tdSql.error("select distinct tagtype from st")
tdSql.error("select distinct tagtype from st")
# query .. order by non-time field
tdSql.error("select * from st order by name")
......
......@@ -28,17 +28,20 @@ class TDTestCase:
print("==============step1")
tdSql.execute("create table stb1 (ts timestamp, c1 int, c2 float) tags(t1 int, t2 binary(10), t3 nchar(10))")
tdSql.execute("insert into tb1 using stb1 tags(1,'tb1', '表1') values ('2020-04-18 15:00:00.000', 1, 0.1), ('2020-04-18 15:00:01.000', 2, 0.1)")
tdSql.execute("insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)")
# inner join --- bug
tdSql.execute(
"create table stb1 (ts timestamp, c1 int, c2 float) tags(t1 int, t2 binary(10), t3 nchar(10))")
tdSql.execute(
"insert into tb1 using stb1 tags(1,'tb1', '表1') values ('2020-04-18 15:00:00.000', 1, 0.1), ('2020-04-18 15:00:01.000', 2, 0.1)")
tdSql.execute(
"insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)")
# inner join --- bug
tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts")
tdSql.checkRows(1)
# join 3 tables -- bug exists
tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_p.id, stb_p.dscrption, stb_p.pressure,stb_v.velocity from stb_p, stb_t, stb_v where stb_p.ts=stb_t.ts and stb_p.ts=stb_v.ts and stb_p.id = stb_t.id")
# query show stable
tdSql.query("show stables")
tdSql.checkRows(1)
......@@ -51,15 +54,15 @@ class TDTestCase:
tdSql.query("select count(*) from stb1")
tdSql.checkData(0, 0, 4)
# query first
# query first
tdSql.query("select first(*) from stb1")
tdSql.checkData(0, 1, 1)
# query last
# query last
tdSql.query("select last(*) from stb1")
tdSql.checkData(0, 1, 4)
# query last_row
# query last_row
tdSql.query("select last_row(*) from stb1")
tdSql.checkData(0, 1, 4)
......@@ -70,7 +73,7 @@ class TDTestCase:
# query first ... as
tdSql.query("select first(*) as begin from stb1")
tdSql.checkData(0, 1, 1)
# query last ... as
tdSql.query("select last(*) as end from stb1")
tdSql.checkData(0, 1, 4)
......@@ -93,7 +96,7 @@ class TDTestCase:
# query ... alias for table ---- bug
tdSql.query("select t.ts from tb1 t")
tdSql.checkRows(2)
tdSql.checkRows(2)
# query ... tbname
tdSql.query("select tbname from stb1")
......
......@@ -111,7 +111,6 @@ class Test (threading.Thread):
last_tb)
written = written + 1
def drop_stable(self):
tdLog.info("drop_stable")
global last_stb
......@@ -152,7 +151,6 @@ class Test (threading.Thread):
last_tb = ""
written = 0
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
global last_stb
......@@ -164,7 +162,6 @@ class Test (threading.Thread):
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % last_stb)
def reset_query_cache(self):
tdLog.info("reset_query_cache")
global last_tb
......@@ -232,7 +229,7 @@ class Test (threading.Thread):
self.threadLock.acquire()
tdLog.notice("first thread")
randDataOp = random.randint(1, 3)
dataOp.get(randDataOp , lambda: "ERROR")()
dataOp.get(randDataOp, lambda: "ERROR")()
self.threadLock.release()
elif (self.threadId == 2):
......
......@@ -111,7 +111,6 @@ class Test (threading.Thread):
last_tb)
written = written + 1
def drop_stable(self):
tdLog.info("drop_stable")
global last_stb
......@@ -154,7 +153,6 @@ class Test (threading.Thread):
last_tb = ""
written = 0
def query_data_from_stable(self):
tdLog.info("query_data_from_stable")
global last_stb
......@@ -166,7 +164,6 @@ class Test (threading.Thread):
tdLog.info("will query data from super table")
tdSql.execute('select * from %s' % last_stb)
def reset_query_cache(self):
tdLog.info("reset_query_cache")
global last_tb
......@@ -230,7 +227,7 @@ class Test (threading.Thread):
self.threadLock.acquire()
tdLog.notice("first thread")
randDataOp = random.randint(1, 3)
dataOp.get(randDataOp , lambda: "ERROR")()
dataOp.get(randDataOp, lambda: "ERROR")()
self.threadLock.release()
elif (self.threadId == 2):
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册