未验证 提交 486df12e 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #11359 from taosdata/feature/TS-238-D

feat(tsdb): delete time series data
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCDELETE_H
#define TDENGINE_TSCDELETE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "qTableMeta.h"
int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCDELETE_H
...@@ -54,7 +54,8 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); ...@@ -54,7 +54,8 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(void **param); void tscFreeRetrieveSup(void **param);
SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj, __async_cb_func_t fp, int32_t cmd);
void doConcurrentlySendSubQueries(SSqlObj* pSql);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -319,6 +319,7 @@ typedef struct { ...@@ -319,6 +319,7 @@ typedef struct {
TAOS_FIELD* final; TAOS_FIELD* final;
struct SGlobalMerger *pMerger; struct SGlobalMerger *pMerger;
int32_t numOfTables;
} SSqlRes; } SSqlRes;
typedef struct { typedef struct {
...@@ -498,6 +499,7 @@ void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *para ...@@ -498,6 +499,7 @@ void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *para
void tscImportDataFromFile(SSqlObj *pSql); void tscImportDataFromFile(SSqlObj *pSql);
struct SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id); struct SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscIsDeleteQuery(SSqlObj* pSql);
char* tscGetSqlStr(SSqlObj* pSql); char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql); bool tscIsQueryWithLimit(SSqlObj* pSql);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tcmdtype.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscDelete.h"
#include "tscSubquery.h"
void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo);
//
// handle error
//
void tscHandleSubDeleteError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
}
//
// sub delete sql callback
//
void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) {
// the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
// function while kill query by a user.
if (param == NULL) {
assert(code != TSDB_CODE_SUCCESS);
return;
}
SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
SSqlObj* pParentSql = trsupport->pParentSql;
SSqlObj* pSql = (SSqlObj *) tres;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0);
SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
//
// stable query killed or other subquery failed, all query stopped
//
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
tfree(pSql->param);
return;
}
/*
* if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
* function to abort current and remain retrieve process.
*
* NOTE: thread safe is required.
*/
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
tscError(":CDEL 0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
tscHandleSubDeleteError(param, tres, pParentSql->res.code);
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
tfree(pSql->param);
return;
}
// record
tscInfo("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve row(s)=%d tables(s)=%d", trsupport->pParentSql->self,
pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pSql->res.numOfRows, pSql->res.numOfTables);
// success do total count
SSubqueryState *subState = &pParentSql->subState;
pthread_mutex_lock(&subState->mutex);
pParentSql->res.numOfRows += pSql->res.numOfRows;
pParentSql->res.numOfTables += pSql->res.numOfTables;
pthread_mutex_unlock(&subState->mutex);
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
tfree(pSql->param);
return ;
}
void writeMsgVgId(char * payload, int32_t vgId) {
SSubmitMsg* pSubmitMsg = (SSubmitMsg *)(payload + sizeof(SMsgDesc));
// SSubmitMsg
pSubmitMsg->header.vgId = htonl(vgId);
}
//
// STable malloc sub delete
//
SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) {
// Init
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj;
pNew->fp = tscSubDeleteCallback;
pNew->fetchFp = tscSubDeleteCallback;
pNew->param = trsupport;
pNew->maxRetry = TSDB_MAX_REPLICA;
SSqlCmd* pNewCmd = &pNew->cmd;
memcpy(pNewCmd, pCmd, sizeof(SSqlCmd));
// set zero
pNewCmd->pQueryInfo = NULL;
pNewCmd->active = NULL;
pNewCmd->payload = NULL;
pNewCmd->allocSize = 0;
// payload copy
int32_t ret = tscAllocPayload(pNewCmd, pCmd->payloadLen);
if (ret != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64":CDEL , sub delete alloc payload failed. errcode=%d", pSql->self, ret);
free(pNew);
return NULL;
}
memcpy(pNewCmd->payload, pCmd->payload, pCmd->payloadLen);
// update vgroup id
writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId);
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew);
tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew);
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pVgroupMsg->vgId, sizeof(pVgroupMsg->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pNew->epSet, &vgroupInfo);
return pNew;
}
//
// execute delete sql
//
int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if(!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// not super table
pSql->cmd.active = pQueryInfo;
return tscBuildAndSendRequest(pSql, pQueryInfo);
}
//
// super table
//
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// check cancel
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
return pRes->code;
}
if(pTableMetaInfo->vgroupList == NULL) {
tscError(":CDEL SQL:%p tablename=%s vgroupList is NULL.", pSql, pTableMetaInfo->name.tname);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
ret = doInitSubState(pSql, numOfSub);
if (ret != 0) {
tscAsyncResultOnError(pSql);
return ret;
}
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
pRes->code = TSDB_CODE_SUCCESS;
int32_t i;
for (i = 0; i < pState->numOfSub; ++i) {
// vgroup
SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i];
// malloc each support
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
break;
}
trs->subqueryIndex = i;
trs->pParentSql = pSql;
// malloc sub SSqlObj
SSqlObj *pNew = tscCreateSTableSubDelete(pSql, pVgroupMsg, trs);
if (pNew == NULL) {
tscError("0x%"PRIx64"CDEL failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs);
break;
}
pSql->pSubs[i] = pNew;
}
if (i < pState->numOfSub) {
tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
doCleanupSubqueries(pSql, i);
return pRes->code;
}
// send sub sql
doConcurrentlySendSubQueries(pSql);
//return TSDB_CODE_TSC_QUERY_CANCELLED;
return TSDB_CODE_SUCCESS;
}
...@@ -1153,7 +1153,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -1153,7 +1153,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
code = validateTableName(tableToken.z, tableToken.n, &sTblToken, &dbIncluded2); code = validateTableName(tableToken.z, tableToken.n, &sTblToken, &dbIncluded2);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(pInsertParam->msg, "invalid table name", *sqlstr); return tscInvalidOperationMsg(pInsertParam->msg, STR_INVALID_TABLE_NAME, *sqlstr);
} }
int32_t ret = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded2); int32_t ret = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded2);
...@@ -1441,7 +1441,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1441,7 +1441,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
bool dbIncluded = false; bool dbIncluded = false;
// Check if the table name available or not // Check if the table name available or not
if (validateTableName(sToken.z, sToken.n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) { if (validateTableName(sToken.z, sToken.n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
code = tscInvalidOperationMsg(pInsertParam->msg, "table name invalid", sToken.z); code = tscInvalidOperationMsg(pInsertParam->msg, STR_INVALID_TABLE_NAME, sToken.z);
goto _clean; goto _clean;
} }
......
...@@ -100,7 +100,7 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -100,7 +100,7 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery);
static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery); static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery, bool delData);
static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t validateRangeNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t validateRangeNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema); static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema);
...@@ -761,8 +761,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -761,8 +761,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
case TSDB_SQL_DESCRIBE_TABLE: { case TSDB_SQL_DESCRIBE_TABLE: {
const char* msg1 = "invalid table name";
SStrToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); SStrToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
bool dbIncluded = false; bool dbIncluded = false;
char buf[TSDB_TABLE_FNAME_LEN]; char buf[TSDB_TABLE_FNAME_LEN];
...@@ -770,7 +768,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -770,7 +768,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
sTblToken.z = buf; sTblToken.z = buf;
if (validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) { if (validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
// additional msg has been attached already // additional msg has been attached already
...@@ -783,8 +781,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -783,8 +781,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
case TSDB_SQL_SHOW_CREATE_STABLE: case TSDB_SQL_SHOW_CREATE_STABLE:
case TSDB_SQL_SHOW_CREATE_TABLE: { case TSDB_SQL_SHOW_CREATE_TABLE: {
const char* msg1 = "invalid table name";
SStrToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); SStrToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
bool dbIncluded = false; bool dbIncluded = false;
...@@ -793,7 +789,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -793,7 +789,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
sTblToken.z = buf; sTblToken.z = buf;
if (validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) { if (validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded); code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded);
...@@ -1052,6 +1048,47 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -1052,6 +1048,47 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
break; break;
} }
case TSDB_SQL_DELETE_DATA: {
// CHECK AND SET TABLE NAME
SStrToken* tbName = &pInfo->pDelData->tableName;
bool dbIncluded = false;
char buf[TSDB_TABLE_FNAME_LEN];
SStrToken sTblToken;
sTblToken.z = buf;
// check
if (validateTableName(tbName->z, tbName->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
// set
code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// get table meta
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code ; // async load table meta
}
// vgroupInfo if super
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return code;
}
// CHECK AND SET WHERE
if (pInfo->pDelData->pWhere) {
// origin check
pQueryInfo = tscGetQueryInfo(pCmd);
if (validateWhereNode(pQueryInfo, &pInfo->pDelData->pWhere, pSql, false, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
break;
}
default: default:
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression"); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
} }
...@@ -5599,12 +5636,13 @@ void convertWhereStringCharset(tSqlExpr* pRight){ ...@@ -5599,12 +5636,13 @@ void convertWhereStringCharset(tSqlExpr* pRight){
static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr, static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr, int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr,
tSqlExpr** tsExpr, bool joinQuery) { tSqlExpr** tsExpr, bool joinQuery, bool delData) {
const char* msg1 = "table query cannot use tags filter"; const char* msg1 = "table query cannot use tags filter";
const char* msg2 = "illegal column name"; const char* msg2 = "illegal column name";
const char* msg4 = "too many join tables"; const char* msg4 = "too many join tables";
const char* msg5 = "not support ordinary column join"; const char* msg5 = "not support ordinary column join";
const char* msg6 = "illegal condition expression"; const char* msg6 = "illegal condition expression";
const char* msg7 = "only allow first timestamp column and tag column";
tSqlExpr* pLeft = (*pExpr)->pLeft; tSqlExpr* pLeft = (*pExpr)->pLeft;
tSqlExpr* pRight = (*pExpr)->pRight; tSqlExpr* pRight = (*pExpr)->pRight;
...@@ -5629,6 +5667,16 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql ...@@ -5629,6 +5667,16 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
// delete where condition check , column must ts or tag
if (delData) {
if (!((pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) ||
index.columnIndex >= tscGetNumOfColumns(pTableMeta) ||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
}
// validate the null expression // validate the null expression
int32_t code = validateNullExpr(*pExpr, pTableMeta, index.columnIndex, tscGetErrorMsgPayload(pCmd)); int32_t code = validateNullExpr(*pExpr, pTableMeta, index.columnIndex, tscGetErrorMsgPayload(pCmd));
...@@ -5648,8 +5696,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql ...@@ -5648,8 +5696,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
return code; return code;
} }
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_NCHAR){ if (pSchema->type == TSDB_DATA_TYPE_NCHAR){
convertWhereStringCharset(pRight); convertWhereStringCharset(pRight);
} }
...@@ -5798,7 +5844,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql ...@@ -5798,7 +5844,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr, int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr, int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr,
tSqlExpr** tsExpr, bool joinQuery) { tSqlExpr** tsExpr, bool joinQuery, bool delData) {
if (pExpr == NULL) { if (pExpr == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5830,12 +5876,12 @@ int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr ...@@ -5830,12 +5876,12 @@ int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr
int32_t rightTbIdx = 0; int32_t rightTbIdx = 0;
if (!tSqlExprIsParentOfLeaf(*pExpr)) { if (!tSqlExprIsParentOfLeaf(*pExpr)) {
ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pLeft, pCondExpr, type ? &leftType : NULL, &leftTbIdx, (*pExpr)->tokenId, &columnLeft, &tsLeft, joinQuery); ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pLeft, pCondExpr, type ? &leftType : NULL, &leftTbIdx, (*pExpr)->tokenId, &columnLeft, &tsLeft, joinQuery, delData);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
goto err_ret; goto err_ret;
} }
ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pRight, pCondExpr, type ? &rightType : NULL, &rightTbIdx, (*pExpr)->tokenId, &columnRight, &tsRight, joinQuery); ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pRight, pCondExpr, type ? &rightType : NULL, &rightTbIdx, (*pExpr)->tokenId, &columnRight, &tsRight, joinQuery, delData);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
goto err_ret; goto err_ret;
} }
...@@ -5890,7 +5936,7 @@ int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr ...@@ -5890,7 +5936,7 @@ int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr
goto err_ret; goto err_ret;
} }
ret = handleExprInQueryCond(pCmd, pQueryInfo, pExpr, pCondExpr, type, tbIdx, parentOptr, columnExpr, tsExpr, joinQuery); ret = handleExprInQueryCond(pCmd, pQueryInfo, pExpr, pCondExpr, type, tbIdx, parentOptr, columnExpr, tsExpr, joinQuery, delData);
if (ret) { if (ret) {
goto err_ret; goto err_ret;
} }
...@@ -6437,13 +6483,13 @@ _ret: ...@@ -6437,13 +6483,13 @@ _ret:
int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery) { int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery, bool delData) {
if (pExpr == NULL) { if (pExpr == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
const char* msg1 = "invalid expression"; const char* msg1 = "invalid expression";
// const char* msg2 = "invalid filter expression"; //const char* msg2 = "the timestamp column condition must be in an interval";
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
...@@ -6465,7 +6511,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq ...@@ -6465,7 +6511,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
} }
#endif #endif
if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, etype, &tbIdx, (*pExpr)->tokenId, &condExpr.pColumnCond, &condExpr.pTimewindow, joinQuery)) != TSDB_CODE_SUCCESS) { if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, etype, &tbIdx, (*pExpr)->tokenId, &condExpr.pColumnCond, &condExpr.pTimewindow, joinQuery, delData)) != TSDB_CODE_SUCCESS) {
goto PARSE_WHERE_EXIT; goto PARSE_WHERE_EXIT;
} }
...@@ -6494,6 +6540,11 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq ...@@ -6494,6 +6540,11 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
goto PARSE_WHERE_EXIT; goto PARSE_WHERE_EXIT;
} }
// check timestamp range
//if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
// return invalidOperationMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
//}
// get the tag query condition // get the tag query condition
if ((ret = getTagQueryCondExpr(&pSql->cmd, pQueryInfo, &condExpr)) != TSDB_CODE_SUCCESS) { if ((ret = getTagQueryCondExpr(&pSql->cmd, pQueryInfo, &condExpr)) != TSDB_CODE_SUCCESS) {
goto PARSE_WHERE_EXIT; goto PARSE_WHERE_EXIT;
...@@ -8998,7 +9049,6 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p ...@@ -8998,7 +9049,6 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
} }
int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow"; const char* msg4 = "illegal value or data overflow";
const char* msg5 = "tags number not matched"; const char* msg5 = "tags number not matched";
...@@ -9034,7 +9084,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9034,7 +9084,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t code = validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded); int32_t code = validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
code = tscSetTableFullName(&pStableMetaInfo->name, &sTblToken, pSql, dbIncluded); code = tscSetTableFullName(&pStableMetaInfo->name, &sTblToken, pSql, dbIncluded);
...@@ -9241,7 +9291,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9241,7 +9291,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
bool dbIncluded2 = false; bool dbIncluded2 = false;
// table name // table name
if (tscValidateName(&(pCreateTableInfo->name), true, &dbIncluded2) != TSDB_CODE_SUCCESS) { if (tscValidateName(&(pCreateTableInfo->name), true, &dbIncluded2) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
...@@ -9253,7 +9303,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9253,7 +9303,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
pCreateTableInfo->fullname = calloc(1, tNameLen(&pTableMetaInfo->name) + 1); pCreateTableInfo->fullname = calloc(1, tNameLen(&pTableMetaInfo->name) + 1);
ret = tNameExtractFullName(&pTableMetaInfo->name, pCreateTableInfo->fullname); ret = tNameExtractFullName(&pTableMetaInfo->name, pCreateTableInfo->fullname);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
} }
...@@ -9261,7 +9311,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9261,7 +9311,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
} }
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg2 = "functions not allowed in CQ"; const char* msg2 = "functions not allowed in CQ";
const char* msg3 = "fill only available for interval query"; const char* msg3 = "fill only available for interval query";
const char* msg4 = "fill option not supported in stream computing"; const char* msg4 = "fill option not supported in stream computing";
...@@ -9285,14 +9334,14 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9285,14 +9334,14 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
bool dbIncluded1 = false; bool dbIncluded1 = false;
if (tscValidateName(pName, true, &dbIncluded1) != TSDB_CODE_SUCCESS) { if (tscValidateName(pName, true, &dbIncluded1) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
// check to valid and create to name // check to valid and create to name
if(pInfo->pCreateTableInfo->to.n > 0) { if(pInfo->pCreateTableInfo->to.n > 0) {
bool dbInclude = false; bool dbInclude = false;
if (tscValidateName(&pInfo->pCreateTableInfo->to, false, &dbInclude) != TSDB_CODE_SUCCESS) { if (tscValidateName(&pInfo->pCreateTableInfo->to, false, &dbInclude) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
int32_t code = tscSetTableFullName(&pInfo->pCreateTableInfo->toSName, &pInfo->pCreateTableInfo->to, pSql, dbInclude); int32_t code = tscSetTableFullName(&pInfo->pCreateTableInfo->toSName, &pInfo->pCreateTableInfo->to, pSql, dbInclude);
if(code != TSDB_CODE_SUCCESS) { if(code != TSDB_CODE_SUCCESS) {
...@@ -9319,7 +9368,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9319,7 +9368,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t code = validateTableName(srcToken.z, srcToken.n, &sTblToken, &dbIncluded2); int32_t code = validateTableName(srcToken.z, srcToken.n, &sTblToken, &dbIncluded2);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded2); code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded2);
...@@ -9339,7 +9388,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9339,7 +9388,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
if (pSqlNode->pWhere != NULL) { // query condition in stream computing if (pSqlNode->pWhere != NULL) { // query condition in stream computing
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery) != TSDB_CODE_SUCCESS) { if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
} }
...@@ -9751,7 +9800,6 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p ...@@ -9751,7 +9800,6 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p
} }
static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, char* msgBuf, SSqlObj* pSql) { static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, char* msgBuf, SSqlObj* pSql) {
const char* msg1 = "invalid table name";
int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list); int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list);
assert(pSqlNode->from->type == SQL_NODE_FROM_TABLELIST); assert(pSqlNode->from->type == SQL_NODE_FROM_TABLELIST);
...@@ -9761,7 +9809,7 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList ...@@ -9761,7 +9809,7 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList
SStrToken* t = &item->tableName; SStrToken* t = &item->tableName;
if (t->type == TK_INTEGER || t->type == TK_FLOAT) { if (t->type == TK_INTEGER || t->type == TK_FLOAT) {
return invalidOperationMsg(msgBuf, msg1); return invalidOperationMsg(msgBuf, STR_INVALID_TABLE_NAME);
} }
bool dbIncluded = false; bool dbIncluded = false;
...@@ -9770,7 +9818,7 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList ...@@ -9770,7 +9818,7 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList
sTblToken.z = buf; sTblToken.z = buf;
if (validateTableName(t->z, t->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) { if (validateTableName(t->z, t->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(msgBuf, msg1); return invalidOperationMsg(msgBuf, STR_INVALID_TABLE_NAME);
} }
SName name = {0}; SName name = {0};
...@@ -10029,7 +10077,6 @@ _end: ...@@ -10029,7 +10077,6 @@ _end:
} }
static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, int32_t numOfTables) { static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, int32_t numOfTables) {
const char* msg1 = "invalid table name";
const char* msg2 = "invalid table alias name"; const char* msg2 = "invalid table alias name";
const char* msg3 = "alias name too long"; const char* msg3 = "alias name too long";
const char* msg4 = "self join not allowed"; const char* msg4 = "self join not allowed";
...@@ -10050,7 +10097,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod ...@@ -10050,7 +10097,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
SStrToken *oriName = &item->tableName; SStrToken *oriName = &item->tableName;
if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) { if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
bool dbIncluded = false; bool dbIncluded = false;
...@@ -10059,7 +10106,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod ...@@ -10059,7 +10106,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
sTblToken.z = buf; sTblToken.z = buf;
if (validateTableName(oriName->z, oriName->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) { if (validateTableName(oriName->z, oriName->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
} }
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
...@@ -10328,7 +10375,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -10328,7 +10375,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// validate the query filter condition info // validate the query filter condition info
if (pSqlNode->pWhere != NULL) { if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery) != TSDB_CODE_SUCCESS) { if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
} else { } else {
...@@ -10433,7 +10480,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -10433,7 +10480,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->onlyHasTagCond = true; pQueryInfo->onlyHasTagCond = true;
// set where info // set where info
if (pSqlNode->pWhere != NULL) { if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery) != TSDB_CODE_SUCCESS) { if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
......
...@@ -130,7 +130,7 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { ...@@ -130,7 +130,7 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
taosCorEndWrite(&pCorEpSet->version); taosCorEndWrite(&pCorEpSet->version);
} }
static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) { void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgroupInfo) {
if (pVgroupInfo == NULL) { return;} if (pVgroupInfo == NULL) { return;}
int8_t inUse = pVgroupInfo->inUse; int8_t inUse = pVgroupInfo->inUse;
pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
...@@ -523,9 +523,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -523,9 +523,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pMsg->numOfRows = htonl(pMsg->numOfRows); pMsg->numOfRows = htonl(pMsg->numOfRows);
pMsg->affectedRows = htonl(pMsg->affectedRows); pMsg->affectedRows = htonl(pMsg->affectedRows);
pMsg->failedRows = htonl(pMsg->failedRows); pMsg->failedRows = htonl(pMsg->failedRows);
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); pMsg->numOfTables = htonl(pMsg->numOfTables);
pRes->numOfRows += pMsg->affectedRows; pRes->numOfRows += pMsg->affectedRows;
if(pMsg->numOfTables > 0) {
pRes->numOfTables = pMsg->numOfTables;
}
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
} else { } else {
...@@ -619,7 +622,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) { ...@@ -619,7 +622,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
} }
tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pSql->self, sqlCmd[pCmd->command], name, type); tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pSql->self, sqlCmd[pCmd->command], name, type);
if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL if (pCmd->command < TSDB_SQL_MGMT && pCmd->command != TSDB_SQL_DELETE_DATA) { // the pTableMetaInfo cannot be NULL
if (pTableMetaInfo == NULL) { if (pTableMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_TSC_APP_ERROR; pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
return pSql->res.code; return pSql->res.code;
...@@ -3313,10 +3316,91 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { ...@@ -3313,10 +3316,91 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
return code; return code;
} }
int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
uint32_t command = CMD_DELETE_DATA;
// pSql->cmd.payloadLen is set during copying data into payload
pCmd->msgType = TSDB_MSG_TYPE_SUBMIT;
if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
// super table to do
command |= FLAG_SUPER_TABLE;
} else {
// no super table to do copy epSet
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
}
SCond *pCond = NULL;
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
}
// set payload
int32_t tagCondLen = 0;
if (pCond) {
tagCondLen = pCond->len;
}
int32_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData) + tagCondLen;
int32_t ret = tscAllocPayload(pCmd, payloadLen);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
pCmd->payloadLen = payloadLen;
char* p = pCmd->payload;
SMsgDesc* pMsgDesc = (SMsgDesc* )p;
p += sizeof(SMsgDesc);
SSubmitMsg* pSubmitMsg = (SSubmitMsg* )p;
p += sizeof(SSubmitMsg);
SSubmitBlk* pSubmitBlk = (SSubmitBlk*)p;
p += sizeof(SSubmitBlk);
SControlData* pControlData = (SControlData* )p;
// SMsgDesc
pMsgDesc->numOfVnodes = htonl(1);
// SSubmitMsg
int32_t size = pCmd->payloadLen - sizeof(SMsgDesc);
pSubmitMsg->header.vgId = htonl(pTableMeta->vgId);
pSubmitMsg->header.contLen = htonl(size);
pSubmitMsg->length = pSubmitMsg->header.contLen;
pSubmitMsg->numOfBlocks = htonl(1);
// SSubmitBlk
pSubmitBlk->flag = FLAG_BLK_CONTROL; // this is control block
pSubmitBlk->tid = htonl(pTableMeta->id.tid);
pSubmitBlk->uid = htobe64(pTableMeta->id.uid);
pSubmitBlk->numOfRows = htons(1);
pSubmitBlk->schemaLen = 0; // only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached
pSubmitBlk->sversion = htonl(pTableMeta->sversion);
pSubmitBlk->dataLen = htonl(sizeof(SControlData) + tagCondLen);
// SControlData
pControlData->command = htonl(command);
pControlData->win.skey = htobe64(pQueryInfo->window.skey);
pControlData->win.ekey = htobe64(pQueryInfo->window.ekey);
// set tagCond
if (pCond) {
pControlData->tagCondLen = htonl(pCond->len);
memcpy(pControlData->tagCond, pCond->cond, pCond->len);
}
return TSDB_CODE_SUCCESS;
}
void tscInitMsgsFp() { void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_DELETE_DATA] = tscBuildDelDataMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
...@@ -3392,5 +3476,4 @@ void tscInitMsgsFp() { ...@@ -3392,5 +3476,4 @@ void tscInitMsgsFp() {
tscKeepConn[TSDB_SQL_SELECT] = 1; tscKeepConn[TSDB_SQL_SELECT] = 1;
tscKeepConn[TSDB_SQL_FETCH] = 1; tscKeepConn[TSDB_SQL_FETCH] = 1;
tscKeepConn[TSDB_SQL_HB] = 1; tscKeepConn[TSDB_SQL_HB] = 1;
} }
\ No newline at end of file
...@@ -435,6 +435,13 @@ int taos_affected_rows(TAOS_RES *tres) { ...@@ -435,6 +435,13 @@ int taos_affected_rows(TAOS_RES *tres) {
return pSql->res.numOfRows; return pSql->res.numOfRows;
} }
int taos_affected_tables(TAOS_RES *tres) {
SSqlObj* pSql = (SSqlObj*) tres;
if (pSql == NULL || pSql->signature != pSql) return 0;
return pSql->res.numOfTables;
}
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
...@@ -2034,8 +2034,6 @@ _return: ...@@ -2034,8 +2034,6 @@ _return:
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
...@@ -2707,7 +2705,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { ...@@ -2707,7 +2705,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
tfree(p); tfree(p);
} }
static void doConcurrentlySendSubQueries(SSqlObj* pSql) { void doConcurrentlySendSubQueries(SSqlObj* pSql) {
SSubqueryState *pState = &pSql->subState; SSubqueryState *pState = &pSql->subState;
// concurrently sent the query requests. // concurrently sent the query requests.
...@@ -2810,7 +2808,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2810,7 +2808,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i; trs->subqueryIndex = i;
trs->pParentSql = pSql; trs->pParentSql = pSql;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL, tscRetrieveDataRes, TSDB_SQL_SELECT);
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs->localBuffer); tfree(trs->localBuffer);
...@@ -2913,7 +2911,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 ...@@ -2913,7 +2911,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
tscError("0x%"PRIx64" sub:0x%"PRIx64" retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql->self, pSql->self, tscError("0x%"PRIx64" sub:0x%"PRIx64" retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql->self, pSql->self,
tstrerror(code), subqueryIndex, trsupport->numOfRetry); tstrerror(code), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql); SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql,tscRetrieveDataRes, TSDB_SQL_SELECT);
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d", tscError("0x%"PRIx64" sub:0x%"PRIx64" failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
oriTrs->pParentSql->self, pSql->self, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex); oriTrs->pParentSql->self, pSql->self, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex);
...@@ -3259,12 +3257,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -3259,12 +3257,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
} }
} }
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj, __async_cb_func_t fp, int32_t cmd) {
const int32_t table_index = 0; const int32_t table_index = 0;
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pPQueryInfo = tscGetQueryInfo(pCmd); // Parent SQueryInfo SQueryInfo *pPQueryInfo = tscGetQueryInfo(pCmd); // Parent SQueryInfo
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj); SSqlObj *pNew = createSubqueryObj(pSql, table_index, fp, trsupport, cmd, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "cJSON.h" #include "cJSON.h"
#include "tscDelete.h"
#ifdef HTTP_EMBEDDED #ifdef HTTP_EMBEDDED
#include "httpInt.h" #include "httpInt.h"
...@@ -3339,6 +3340,9 @@ bool tscShouldBeFreed(SSqlObj* pSql) { ...@@ -3339,6 +3340,9 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t tableIndex) { STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t tableIndex) {
assert(pCmd != NULL); assert(pCmd != NULL);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if(pQueryInfo == NULL) {
return NULL;
}
return tscGetMetaInfo(pQueryInfo, tableIndex); return tscGetMetaInfo(pQueryInfo, tableIndex);
} }
...@@ -4209,7 +4213,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4209,7 +4213,13 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
(*pSql->fp)(pSql->param, pSql, 0); (*pSql->fp)(pSql->param, pSql, 0);
return; return;
} } else if (pSql->cmd.command == TSDB_SQL_DELETE_DATA) {
code = executeDelete(pSql, pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
}
return ;
}
if (pSql->cmd.command == TSDB_SQL_SELECT) { if (pSql->cmd.command == TSDB_SQL_SELECT) {
tscAddIntoSqlList(pSql); tscAddIntoSqlList(pSql);
...@@ -4349,6 +4359,15 @@ bool tscIsUpdateQuery(SSqlObj* pSql) { ...@@ -4349,6 +4359,15 @@ bool tscIsUpdateQuery(SSqlObj* pSql) {
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command); return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command);
} }
bool tscIsDeleteQuery(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
return pCmd->command == TSDB_SQL_DELETE_DATA;
}
char* tscGetSqlStr(SSqlObj* pSql) { char* tscGetSqlStr(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return NULL; return NULL;
......
...@@ -35,6 +35,7 @@ enum { ...@@ -35,6 +35,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DELETE_DATA, "delete-data" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
// the SQL below is for mgmt node // the SQL below is for mgmt node
......
...@@ -141,4 +141,7 @@ int32_t tNameSetAcctId(SName* dst, const char* acct); ...@@ -141,4 +141,7 @@ int32_t tNameSetAcctId(SName* dst, const char* acct);
int32_t tNameSetDbName(SName* dst, const char* acct, SStrToken* dbToken); int32_t tNameSetDbName(SName* dst, const char* acct, SStrToken* dbToken);
// define uniform string
#define STR_INVALID_TABLE_NAME "invalid table name"
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -514,7 +514,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -514,7 +514,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pBlk->tid = htonl(pObj->tid); pBlk->tid = htonl(pObj->tid);
pBlk->numOfRows = htons(1); pBlk->numOfRows = htons(1);
pBlk->sversion = htonl(pSchema->version); pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0; pBlk->flag = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow); pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "tqueue.h" #include "tqueue.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "tthread.h"
typedef struct { typedef struct {
taos_qall qall; taos_qall qall;
...@@ -161,6 +162,40 @@ void dnodeFreeVWriteQueue(void *pWqueue) { ...@@ -161,6 +162,40 @@ void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(pWqueue); taosCloseQueue(pWqueue);
} }
void* waitingResultThread(void* param) {
SVWriteMsg* pWrite = (SVWriteMsg* )param;
// wait AddWaitThread to list finished
dDebug(":SDEL pVnode:%p wait AddWaitThread finished... pWrite=%p", pWrite->pVnode, pWrite);
tsem_t* psem = vnodeSemWait(pWrite->pVnode);
tsem_wait(psem);
tsem_post(psem);
dDebug(":SDEL pVnode:%p wait AddWaitThread ok pWrite=%p", pWrite->pVnode, pWrite);
// wait request deal finished
int32_t ret = tsem_wait(pWrite->rspRet.psem);
dDebug(":SDEL pVnode:%p wait request ok pWrite=%p", pWrite->pVnode, pWrite);
if(ret == 0) {
// success
}
tsem_destroy(pWrite->rspRet.psem);
tfree(pWrite->rspRet.psem);
// wait ok
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
};
rpcSendResponse(&rpcRsp);
// remove from thread manager
vnodeRemoveWait(pWrite->pVnode, pWrite);
vnodeFreeFromWQueue(pWrite->pVnode, pWrite);
return NULL;
}
void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (wparam == NULL) return; if (wparam == NULL) return;
SVWriteMsg *pWrite = wparam; SVWriteMsg *pWrite = wparam;
...@@ -177,8 +212,27 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { ...@@ -177,8 +212,27 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
.code = pWrite->code, .code = pWrite->code,
}; };
rpcSendResponse(&rpcRsp); if(pWrite->rspRet.psem == 0) {
vnodeFreeFromWQueue(pVnode, pWrite); // no wait response
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
} else {
if (vnodeWaitTooMany(pVnode)) {
// too many wait , so can not wait again
rpcRsp.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
} else {
tsem_t* psem = vnodeSemWait(pVnode);
tsem_wait(psem);
// need async to wait result in another thread
pthread_t* thread = taosCreateThread(waitingResultThread, pWrite);
// add to wait thread manager
vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite);
dDebug(":SDEL pVnode=%p vnode add wait %p ok, tsem_post.", pVnode, pWrite);
tsem_post(psem);
}
}
} }
static void *dnodeProcessVWriteQueue(void *wparam) { static void *dnodeProcessVWriteQueue(void *wparam) {
......
...@@ -212,6 +212,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS* taos, char* lines[], int numLi ...@@ -212,6 +212,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS* taos, char* lines[], int numLi
DLL_EXPORT int32_t taos_parse_time(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); DLL_EXPORT int32_t taos_parse_time(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
DLL_EXPORT int taos_affected_tables(TAOS_RES *res);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -249,6 +249,7 @@ int32_t* taosGetErrno(); ...@@ -249,6 +249,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied" #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied"
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing" #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing"
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state" #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state"
#define TSDB_CODE_WAIT_THREAD_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0515) //"Wait threads too many"
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID") #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
......
...@@ -120,6 +120,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) ...@@ -120,6 +120,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
// delete
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105 TSDB_MSG_TYPE_MAX // 105
#endif #endif
...@@ -195,6 +199,9 @@ enum _mgmt_table { ...@@ -195,6 +199,9 @@ enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
// SSubmitBlk->flag define
#define FLAG_BLK_CONTROL 0x00000001 // SSubmitBlk is a control block to submit
#define IS_CONTROL_BLOCK(x) (x->flag & FLAG_BLK_CONTROL)
extern char *taosMsg[]; extern char *taosMsg[];
...@@ -219,7 +226,7 @@ typedef struct SMsgHead { ...@@ -219,7 +226,7 @@ typedef struct SMsgHead {
typedef struct SSubmitBlk { typedef struct SSubmitBlk {
uint64_t uid; // table unique id uint64_t uid; // table unique id
int32_t tid; // table id int32_t tid; // table id
int32_t padding; // TODO just for padding here int32_t flag; // extend special information, can see FLAG_BLK_??? define
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists int32_t schemaLen; // schema length, if length is 0, no schema exists
...@@ -249,7 +256,7 @@ typedef struct { ...@@ -249,7 +256,7 @@ typedef struct {
int32_t numOfRows; // number of records the client is trying to write int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records) int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks; int32_t numOfTables; // affected tables
SShellSubmitRspBlock failedBlocks[]; SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRspMsg; } SShellSubmitRspMsg;
...@@ -260,6 +267,11 @@ typedef struct SSchema { ...@@ -260,6 +267,11 @@ typedef struct SSchema {
int16_t bytes; int16_t bytes;
} SSchema; } SSchema;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
...@@ -408,6 +420,23 @@ typedef struct { ...@@ -408,6 +420,23 @@ typedef struct {
int32_t vgId; int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; } SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
uint16_t nSpan;
char tableFname[TSDB_TABLE_FNAME_LEN];
STimeWindow span[];
} STruncateTblMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
uint16_t nSpan;
char tableFname[TSDB_TABLE_FNAME_LEN];
STimeWindow span[];
} SDeleteDataMsg;
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
...@@ -460,11 +489,6 @@ typedef struct STableIdInfo { ...@@ -460,11 +489,6 @@ typedef struct STableIdInfo {
TSKEY key; // last accessed ts, for subscription TSKEY key; // last accessed ts, for subscription
} STableIdInfo; } STableIdInfo;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct { typedef struct {
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block int32_t tsLen; // total length of ts comp block
...@@ -564,6 +588,7 @@ typedef struct { ...@@ -564,6 +588,7 @@ typedef struct {
uint8_t role; uint8_t role;
uint8_t replica; uint8_t replica;
uint8_t compact; uint8_t compact;
uint8_t truncate;
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
...@@ -977,6 +1002,21 @@ typedef struct { ...@@ -977,6 +1002,21 @@ typedef struct {
char value[]; char value[];
} STLV; } STLV;
// Ox00000001 ~ 0x00010000 command id 16 items
#define CMD_DELETE_DATA 0x00000001
// 0x00010000 ~ 0x10000000 command flag 16 items
#define FLAG_SUPER_TABLE 0x00010000
#define GET_CTLDATA_SIZE(p) (sizeof(SControlData) + p->tnum * sizeof(int32_t))
typedef struct SControlData {
uint32_t command; // see define CMD_???
STimeWindow win;
// tag cond
int32_t tagCondLen;
char tagCond[];
} SControlData;
enum { enum {
TLV_TYPE_END_MARK = -1, TLV_TYPE_END_MARK = -1,
//TLV_TYPE_DUMMY = 1, //TLV_TYPE_DUMMY = 1,
......
...@@ -96,6 +96,7 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit); ...@@ -96,6 +96,7 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo); int tsdbGetState(STsdbRepo *repo);
int8_t tsdbGetCompactState(STsdbRepo *repo); int8_t tsdbGetCompactState(STsdbRepo *repo);
int8_t tsdbGetDeleteState(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {
uint64_t uid; // the unique table ID uint64_t uid; // the unique table ID
...@@ -159,7 +160,7 @@ typedef struct { ...@@ -159,7 +160,7 @@ typedef struct {
* *
* @return the number of points inserted, -1 for failure and the error number is set * @return the number of points inserted, -1 for failure and the error number is set
*/ */
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, tsem_t** ppSem);
// -- FOR QUERY TIME SERIES DATA // -- FOR QUERY TIME SERIES DATA
...@@ -420,6 +421,9 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); ...@@ -420,6 +421,9 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact // For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo); int tsdbCompact(STsdbRepo *pRepo);
// For TSDB delete data
int tsdbDeleteData(STsdbRepo *pRepo, void *param);
// For TSDB Health Monitor // For TSDB Health Monitor
// no problem return true // no problem return true
...@@ -442,6 +446,8 @@ char* parseTagDatatoJson(void *p); ...@@ -442,6 +446,8 @@ char* parseTagDatatoJson(void *p);
typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid); typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid);
void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param); void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param);
int32_t tsdbTableTid(void* pTable);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -177,48 +177,49 @@ ...@@ -177,48 +177,49 @@
#define TK_CONNECTION 159 #define TK_CONNECTION 159
#define TK_STREAM 160 #define TK_STREAM 160
#define TK_COLON 161 #define TK_COLON 161
#define TK_ABORT 162 #define TK_DELETE 162
#define TK_AFTER 163 #define TK_ABORT 163
#define TK_ATTACH 164 #define TK_AFTER 164
#define TK_BEFORE 165 #define TK_ATTACH 165
#define TK_BEGIN 166 #define TK_BEFORE 166
#define TK_CASCADE 167 #define TK_BEGIN 167
#define TK_CLUSTER 168 #define TK_CASCADE 168
#define TK_CONFLICT 169 #define TK_CLUSTER 169
#define TK_COPY 170 #define TK_CONFLICT 170
#define TK_DEFERRED 171 #define TK_COPY 171
#define TK_DELIMITERS 172 #define TK_DEFERRED 172
#define TK_DETACH 173 #define TK_DELIMITERS 173
#define TK_EACH 174 #define TK_DETACH 174
#define TK_END 175 #define TK_EACH 175
#define TK_EXPLAIN 176 #define TK_END 176
#define TK_FAIL 177 #define TK_EXPLAIN 177
#define TK_FOR 178 #define TK_FAIL 178
#define TK_IGNORE 179 #define TK_FOR 179
#define TK_IMMEDIATE 180 #define TK_IGNORE 180
#define TK_INITIALLY 181 #define TK_IMMEDIATE 181
#define TK_INSTEAD 182 #define TK_INITIALLY 182
#define TK_KEY 183 #define TK_INSTEAD 183
#define TK_OF 184 #define TK_KEY 184
#define TK_RAISE 185 #define TK_OF 185
#define TK_REPLACE 186 #define TK_RAISE 186
#define TK_RESTRICT 187 #define TK_REPLACE 187
#define TK_ROW 188 #define TK_RESTRICT 188
#define TK_STATEMENT 189 #define TK_ROW 189
#define TK_TRIGGER 190 #define TK_STATEMENT 190
#define TK_VIEW 191 #define TK_TRIGGER 191
#define TK_IPTOKEN 192 #define TK_VIEW 192
#define TK_SEMI 193 #define TK_IPTOKEN 193
#define TK_NONE 194 #define TK_SEMI 194
#define TK_PREV 195 #define TK_NONE 195
#define TK_LINEAR 196 #define TK_PREV 196
#define TK_IMPORT 197 #define TK_LINEAR 197
#define TK_TBNAME 198 #define TK_IMPORT 198
#define TK_JOIN 199 #define TK_TBNAME 199
#define TK_INSERT 200 #define TK_JOIN 200
#define TK_INTO 201 #define TK_INSERT 201
#define TK_VALUES 202 #define TK_INTO 202
#define TK_FILE 203 #define TK_VALUES 203
#define TK_FILE 204
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -32,6 +32,7 @@ typedef struct { ...@@ -32,6 +32,7 @@ typedef struct {
int32_t len; int32_t len;
void * rsp; void * rsp;
void * qhandle; // used by query and retrieve msg void * qhandle; // used by query and retrieve msg
tsem_t* psem; // if it is not zero, need wait result with async
} SRspRet; } SRspRet;
typedef struct { typedef struct {
...@@ -58,6 +59,13 @@ typedef struct { ...@@ -58,6 +59,13 @@ typedef struct {
SWalHead walHead; SWalHead walHead;
} SVWriteMsg; } SVWriteMsg;
typedef struct {
int32_t startTime;
pthread_t * pthread;
tsem_t * psem;
void * param;
} SWaitThread;
// vnodeStatus // vnodeStatus
extern char *vnodeStatus[]; extern char *vnodeStatus[];
...@@ -96,6 +104,13 @@ int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qt ...@@ -96,6 +104,13 @@ int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qt
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
// wait thread
void vnodeAddWait(void* pVnode, pthread_t* pthread, tsem_t* psem, void* param);
void vnodeRemoveWait(void* pVnode, void* param);
// get wait thread count
bool vnodeWaitTooMany(void* vparam);
tsem_t* vnodeSemWait(void* vparam);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -291,7 +291,20 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -291,7 +291,20 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
return; return;
} }
if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands if (tscIsDeleteQuery(pSql)) {
// delete
int numOfRows = taos_affected_rows(pSql);
int numOfTables = taos_affected_tables(pSql);
int error_no = taos_errno(pSql);
et = taosGetTimestampUs();
if (error_no == TSDB_CODE_SUCCESS) {
printf("Deleted %d row(s) from %d table(s) (%.6fs)\n", numOfRows, numOfTables, (et - st) / 1E6);
} else {
printf("Deleted interrupted (%s), %d row(s) from %d tables (%.6fs)\n", taos_errstr(pSql), numOfRows, numOfTables, (et - st) / 1E6);
}
}
else if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands
int error_no = 0; int error_no = 0;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
......
...@@ -352,7 +352,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -352,7 +352,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes); pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup,NULL); mnodeSendAlterVgroupMsg(pVgroup,NULL);
} }
pVgroup->compact = pVload->compact; pVgroup->compact = pVload->compact;
} }
static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
...@@ -848,7 +848,6 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v ...@@ -848,7 +848,6 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->compact; *(int8_t *)pWrite = pVgroup->compact;
cols++; cols++;
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
numOfRows++; numOfRows++;
} }
......
...@@ -252,6 +252,12 @@ typedef struct SMiscInfo { ...@@ -252,6 +252,12 @@ typedef struct SMiscInfo {
}; };
} SMiscInfo; } SMiscInfo;
typedef struct SDelData {
bool existsCheck;
SStrToken tableName;
struct tSqlExpr* pWhere;
} SDelData;
typedef struct SSqlInfo { typedef struct SSqlInfo {
int32_t type; int32_t type;
bool valid; bool valid;
...@@ -262,6 +268,7 @@ typedef struct SSqlInfo { ...@@ -262,6 +268,7 @@ typedef struct SSqlInfo {
SCreateTableSql *pCreateTableInfo; SCreateTableSql *pCreateTableInfo;
SAlterTableInfo *pAlterInfo; SAlterTableInfo *pAlterInfo;
SMiscInfo *pMiscInfo; SMiscInfo *pMiscInfo;
SDelData *pDelData;
}; };
} SSqlInfo; } SSqlInfo;
...@@ -364,6 +371,9 @@ void tSetDbName(SStrToken *pCpxName, SStrToken *pDb); ...@@ -364,6 +371,9 @@ void tSetDbName(SStrToken *pCpxName, SStrToken *pDb);
void tSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType); void tSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType);
void tSetColumnType(TAOS_FIELD *pField, SStrToken *type); void tSetColumnType(TAOS_FIELD *pField, SStrToken *type);
// malloc new SDelData and set with args
SDelData *tGetDelData(SStrToken *pTableName, SStrToken* existsCheck, tSqlExpr* pWhere);
/** /**
* *
* @param yyp The parser * @param yyp The parser
......
...@@ -985,7 +985,16 @@ cmd ::= KILL CONNECTION INTEGER(Y). {setKillSql(pInfo, TSDB_SQL_KILL_CONNECTIO ...@@ -985,7 +985,16 @@ cmd ::= KILL CONNECTION INTEGER(Y). {setKillSql(pInfo, TSDB_SQL_KILL_CONNECTIO
cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_STREAM, &X);} cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_QUERY, &X);} cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_QUERY, &X);}
//////////////////////////////////// DEL TRUNCATE TABLE /////////////////////////////////////
//1 DELETE FROM TBNAME/STBNAME WHERE TS AND TAG CONDICTION
cmd ::= DELETE FROM ifexists(Y) ids(X) cpxName(Z) where_opt(W). {
X.n += Z.n;
SDelData * pDelData = tGetDelData(&X, &Y, W);
setSqlInfo(pInfo, pDelData, NULL, TSDB_SQL_DELETE_DATA);
}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED %fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
LIKE MATCH NMATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL LIKE MATCH NMATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
NOW TODAY IPTOKEN SEMI NONE PREV LINEAR IMPORT TBNAME JOIN STABLE NULL INSERT INTO VALUES FILE. NOW TODAY IPTOKEN SEMI NONE PREV LINEAR IMPORT TBNAME JOIN STABLE NULL INSERT INTO VALUES FILE.
\ No newline at end of file
...@@ -2453,6 +2453,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2453,6 +2453,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap); taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL; pRuntimeEnv->pTableRetrieveTsMap = NULL;
taosHashCleanup(pRuntimeEnv->pTablesRead);
pRuntimeEnv->pTablesRead = NULL;
taosHashCleanup(pRuntimeEnv->pResultRowListSet); taosHashCleanup(pRuntimeEnv->pResultRowListSet);
pRuntimeEnv->pResultRowListSet = NULL; pRuntimeEnv->pResultRowListSet = NULL;
......
...@@ -1212,6 +1212,9 @@ void SqlInfoDestroy(SSqlInfo *pInfo) { ...@@ -1212,6 +1212,9 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
taosArrayDestroy(&pInfo->funcs); taosArrayDestroy(&pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) { if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(pInfo->list); destroyAllSqlNode(pInfo->list);
} else if (pInfo->type == TSDB_SQL_DELETE_DATA) {
tSqlExprDestroy(pInfo->pDelData->pWhere);
tfree(pInfo->pDelData);
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) { } else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
...@@ -1474,3 +1477,15 @@ void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo) { ...@@ -1474,3 +1477,15 @@ void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo) {
pDBInfo->dbType = TSDB_DB_TYPE_TOPIC; pDBInfo->dbType = TSDB_DB_TYPE_TOPIC;
pDBInfo->partitions = TSDB_DEFAULT_DB_PARTITON_OPTION; pDBInfo->partitions = TSDB_DEFAULT_DB_PARTITON_OPTION;
} }
// malloc new SDelData and set with args
SDelData* tGetDelData(SStrToken* pTableName, SStrToken* existsCheck, tSqlExpr* pWhere) {
// malloc
SDelData* pDelData = (SDelData *) calloc(1, sizeof(SDelData));
// set value
pDelData->existsCheck = (existsCheck->n == 1);
pDelData->tableName = *pTableName;
pDelData->pWhere = pWhere;
return pDelData;
}
\ No newline at end of file
此差异已折叠。
...@@ -142,6 +142,7 @@ static int32_t tsRpcNum = 0; ...@@ -142,6 +142,7 @@ static int32_t tsRpcNum = 0;
#define RPC_CONN_UDPC 1 #define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2 #define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3 #define RPC_CONN_TCPC 3
#define RPC_CONN_AUTO 4 // need tcp use tcp
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpConnection, taosInitUdpConnection,
...@@ -405,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 ...@@ -405,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType; char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_SUBMIT
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP
|| type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META
|| type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_CM_ALTER_TABLE) || type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_CM_ALTER_TABLE)
......
...@@ -34,7 +34,7 @@ typedef struct { ...@@ -34,7 +34,7 @@ typedef struct {
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo, bool end);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
...@@ -42,6 +42,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFil ...@@ -42,6 +42,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFil
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf); SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf);
int tsdbApplyRtn(STsdbRepo *pRepo); int tsdbApplyRtn(STsdbRepo *pRepo);
// commit control command
int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) { if (fid >= pRtn->maxFid) {
return 0; return 0;
...@@ -54,4 +57,21 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { ...@@ -54,4 +57,21 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
} }
} }
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
int64_t fid;
if (key < 0) {
fid = ((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
fid = ((key / tsTickPerDay[precision] / days));
}
// check fid over int max or min, set with int max or min
if (fid > INT32_MAX) {
fid = INT32_MAX;
} else if(fid < INT32_MIN){
fid = INT32_MIN;
}
return (int)fid;
}
#endif /* _TD_TSDB_COMMIT_H_ */ #endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
...@@ -16,8 +16,14 @@ ...@@ -16,8 +16,14 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_ #ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T; typedef enum {
COMMIT_REQ,
COMMIT_BOTH_REQ,
COMPACT_REQ,
CONTROL_REQ,
COMMIT_CONFIG_REQ,
} TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req); int tsdbScheduleCommit(STsdbRepo *pRepo, void* param, TSDB_REQ_T req);
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ #endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_DELETE_H_
#define _TD_TSDB_DELETE_H_
#ifdef __cplusplus
extern "C" {
#endif
// SControlData addition information
#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p->tnum * sizeof(int32_t))
typedef struct {
// addition info
tsem_t* pSem;
bool memNull; // pRepo->mem is NULL, this is true
int32_t affectedRows;
SShellSubmitRspMsg *pRsp;
// base ControlData
STimeWindow win; // come from SControlData.win
uint32_t command; // come from SControlData.command
int32_t tnum; // num of tids array
int32_t tids[];
} SControlDataInfo;
// -------- interface ---------
// delete
int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_DELETE_H_ */
\ No newline at end of file
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_TSDB_MEMTABLE_H_ #ifndef _TD_TSDB_MEMTABLE_H_
#define _TD_TSDB_MEMTABLE_H_ #define _TD_TSDB_MEMTABLE_H_
#include "tsdbDelete.h"
typedef struct { typedef struct {
int rowsInserted; int rowsInserted;
int rowsUpdated; int rowsUpdated;
...@@ -65,11 +66,12 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); ...@@ -65,11 +66,12 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); // if pCtrlData is NULL, force must be true
int tsdbAsyncCommit(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
int tsdbSyncCommitConfig(STsdbRepo* pRepo); int tsdbSyncCommitConfig(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo); void* tsdbCommitData(STsdbRepo* pRepo, bool end);
static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
......
...@@ -162,7 +162,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { ...@@ -162,7 +162,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
} }
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow))); ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == TSKEY_INITIAL_VAL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
return pTable->lastKey; return pTable->lastKey;
} }
......
...@@ -66,6 +66,8 @@ extern "C" { ...@@ -66,6 +66,8 @@ extern "C" {
#include "tsdbCommit.h" #include "tsdbCommit.h"
// Compact // Compact
#include "tsdbCompact.h" #include "tsdbCompact.h"
// Delete
#include "tsdbDelete.h"
// Commit Queue // Commit Queue
#include "tsdbCommitQueue.h" #include "tsdbCommitQueue.h"
...@@ -97,6 +99,8 @@ struct STsdbRepo { ...@@ -97,6 +99,8 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
int8_t deleteState; // truncate state: inTruncate/noTruncate/waitingTruncate
pthread_t* pthread; pthread_t* pthread;
}; };
...@@ -112,9 +116,10 @@ STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); ...@@ -112,9 +116,10 @@ STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo);
UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable); int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable, bool force);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]);
int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx, bool onlyKey);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL); ASSERT(pRepo != NULL);
......
...@@ -17,13 +17,6 @@ ...@@ -17,13 +17,6 @@
extern int32_t tsTsdbMetaCompactRatio; extern int32_t tsTsdbMetaCompactRatio;
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
typedef struct { typedef struct {
SRtn rtn; // retention snapshot SRtn rtn; // retention snapshot
...@@ -73,7 +66,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); ...@@ -73,7 +66,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
static int tsdbCommitTSData(STsdbRepo *pRepo); static int tsdbCommitTSData(STsdbRepo *pRepo);
static void tsdbStartCommit(STsdbRepo *pRepo); static void tsdbStartCommit(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno); static void tsdbEndCommit(STsdbRepo *pRepo, int eno, bool end);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCreateCommitIters(SCommitH *pCommith); static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith);
...@@ -100,14 +93,14 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p ...@@ -100,14 +93,14 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update); TSKEY maxKey, int maxRows, int8_t update);
void *tsdbCommitData(STsdbRepo *pRepo) { void *tsdbCommitData(STsdbRepo *pRepo, bool end) {
if (pRepo->imem == NULL) { if (pRepo->imem == NULL) {
return NULL; return NULL;
} }
tsdbStartCommit(pRepo); tsdbStartCommit(pRepo);
if (tsShortcutFlag & TSDB_SHORTCUT_RB_TSDB_COMMIT) { if (tsShortcutFlag & TSDB_SHORTCUT_RB_TSDB_COMMIT) {
tsdbEndCommit(pRepo, terrno); tsdbEndCommit(pRepo, terrno, end);
return NULL; return NULL;
} }
...@@ -123,14 +116,14 @@ void *tsdbCommitData(STsdbRepo *pRepo) { ...@@ -123,14 +116,14 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
goto _err; goto _err;
} }
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS, end);
return NULL; return NULL;
_err: _err:
ASSERT(terrno != TSDB_CODE_SUCCESS); ASSERT(terrno != TSDB_CODE_SUCCESS);
pRepo->code = terrno; pRepo->code = terrno;
tsdbEndCommit(pRepo, terrno); tsdbEndCommit(pRepo, terrno, end);
return NULL; return NULL;
} }
...@@ -255,7 +248,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { ...@@ -255,7 +248,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; if (tsdbMakeRoom(ppBuf, tlen + size) < 0)
return -1;
void *ptr = POINTER_SHIFT(*ppBuf, tlen); void *ptr = POINTER_SHIFT(*ppBuf, tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx); tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
...@@ -264,7 +258,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { ...@@ -264,7 +258,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
} }
tlen += sizeof(TSCKSUM); tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; if (tsdbMakeRoom(ppBuf, tlen) < 0)
return -1;
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
...@@ -703,7 +698,7 @@ static void tsdbStartCommit(STsdbRepo *pRepo) { ...@@ -703,7 +698,7 @@ static void tsdbStartCommit(STsdbRepo *pRepo) {
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
} }
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { static void tsdbEndCommit(STsdbRepo *pRepo, int eno, bool end) {
if (eno != TSDB_CODE_SUCCESS) { if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo)); tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else { } else {
...@@ -712,14 +707,21 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { ...@@ -712,14 +707,21 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); // notify
if (end && pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
}
SMemTable *pIMem = pRepo->imem; SMemTable *pIMem = pRepo->imem;
(void)tsdbLockRepo(pRepo); (void)tsdbLockRepo(pRepo);
pRepo->imem = NULL; pRepo->imem = NULL;
(void)tsdbUnlockRepo(pRepo); (void)tsdbUnlockRepo(pRepo);
tsdbUnRefMemTable(pRepo, pIMem); tsdbUnRefMemTable(pRepo, pIMem);
tsem_post(&(pRepo->readyToCommit));
// release readyToCommit allow next commit
if (end) {
tsem_post(&(pRepo->readyToCommit));
}
} }
#if 0 #if 0
...@@ -1787,3 +1789,28 @@ int tsdbApplyRtn(STsdbRepo *pRepo) { ...@@ -1787,3 +1789,28 @@ int tsdbApplyRtn(STsdbRepo *pRepo) {
return 0; return 0;
} }
// do control task
int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) {
int ret = TSDB_CODE_SUCCESS;
// do command
if(pCtlDataInfo->command & CMD_DELETE_DATA) {
// delete data
ret = tsdbControlDelete(pRepo, pCtlDataInfo);
}
// notify response thread to response result to client
if (pCtlDataInfo->pSem) {
tsem_post(pCtlDataInfo->pSem);
}
// deal wal
if (pRepo->appH.notifyStatus)
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, ret);
// release commitSep for next commit
tsem_post(&pRepo->readyToCommit);
return ret;
}
\ No newline at end of file
...@@ -28,6 +28,7 @@ typedef struct { ...@@ -28,6 +28,7 @@ typedef struct {
typedef struct { typedef struct {
TSDB_REQ_T req; TSDB_REQ_T req;
STsdbRepo *pRepo; STsdbRepo *pRepo;
void * param;
} SReq; } SReq;
static void *tsdbLoopCommit(void *arg); static void *tsdbLoopCommit(void *arg);
...@@ -91,7 +92,7 @@ void tsdbDestroyCommitQueue() { ...@@ -91,7 +92,7 @@ void tsdbDestroyCommitQueue() {
pthread_mutex_destroy(&(pQueue->lock)); pthread_mutex_destroy(&(pQueue->lock));
} }
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { int tsdbScheduleCommit(STsdbRepo *pRepo, void *param, TSDB_REQ_T req) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq)); SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
...@@ -102,6 +103,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { ...@@ -102,6 +103,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
((SReq *)pNode->data)->req = req; ((SReq *)pNode->data)->req = req;
((SReq *)pNode->data)->pRepo = pRepo; ((SReq *)pNode->data)->pRepo = pRepo;
((SReq *)pNode->data)->param = param;
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -158,6 +160,7 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -158,6 +160,7 @@ static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL; SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL; STsdbRepo * pRepo = NULL;
void * param = NULL;
TSDB_REQ_T req; TSDB_REQ_T req;
setThreadName("tsdbCommit"); setThreadName("tsdbCommit");
...@@ -183,19 +186,27 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -183,19 +186,27 @@ static void *tsdbLoopCommit(void *arg) {
req = ((SReq *)pNode->data)->req; req = ((SReq *)pNode->data)->req;
pRepo = ((SReq *)pNode->data)->pRepo; pRepo = ((SReq *)pNode->data)->pRepo;
param = ((SReq *)pNode->data)->param;
if (req == COMMIT_REQ) { if (req == COMMIT_REQ) {
tsdbCommitData(pRepo); tsdbCommitData(pRepo, true);
} else if (req == COMPACT_REQ) { } else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo); tsdbCompactImpl(pRepo);
} else if (req == COMMIT_CONFIG_REQ) { } else if (req == COMMIT_BOTH_REQ) {
SControlDataInfo* pCtlDataInfo = (SControlDataInfo* )param;
if(!pCtlDataInfo->memNull) {
tsdbInfo(":SDEL vgId=%d commit mem before delete data. mem=%p imem=%p \n", REPO_ID(pRepo), pRepo->mem, pRepo->imem);
tsdbCommitData(pRepo, false);
}
tsdbCommitControl(pRepo, param);
} else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed); ASSERT(pRepo->config_changed);
tsdbApplyRepoConfig(pRepo); tsdbApplyRepoConfig(pRepo);
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} else { } else {
ASSERT(0); ASSERT(0);
} }
tfree(param);
listNodeFree(pNode); listNodeFree(pNode);
} }
......
...@@ -97,11 +97,15 @@ _err: ...@@ -97,11 +97,15 @@ _err:
static int tsdbAsyncCompact(STsdbRepo *pRepo) { static int tsdbAsyncCompact(STsdbRepo *pRepo) {
if (pRepo->compactState != TSDB_NO_COMPACT) { if (pRepo->compactState != TSDB_NO_COMPACT) {
tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo)); tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo));
return 0; return 0;
} }
pRepo->compactState = TSDB_WAITING_COMPACT; pRepo->compactState = TSDB_WAITING_COMPACT;
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
return tsdbScheduleCommit(pRepo, COMPACT_REQ); int code = tsdbScheduleCommit(pRepo, NULL, COMPACT_REQ);
if (code != 0) {
tsem_post(&(pRepo->readyToCommit));
}
return code;
} }
static void tsdbStartCompact(STsdbRepo *pRepo) { static void tsdbStartCompact(STsdbRepo *pRepo) {
......
此差异已折叠。
...@@ -30,7 +30,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo); ...@@ -30,7 +30,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo);
static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo);
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(int repoid) { int32_t tsdbCreateRepo(int repoid) {
...@@ -188,7 +187,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { ...@@ -188,7 +187,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config); STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbAsyncCommit(pRepo, NULL) < 0) return -1;
} }
return 0; return 0;
} }
...@@ -202,7 +201,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { ...@@ -202,7 +201,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
if ((pRepo->mem->extraBuffList != NULL) || if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit // trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbAsyncCommit(pRepo, NULL) < 0) return -1;
} }
return 0; return 0;
} }
...@@ -215,6 +214,8 @@ int tsdbGetState(STsdbRepo *repo) { return repo->state; } ...@@ -215,6 +214,8 @@ int tsdbGetState(STsdbRepo *repo) { return repo->state; }
int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); } int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); }
int8_t tsdbGetDeleteState(STsdbRepo *repo) { return (int8_t)(repo->deleteState); }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
STsdbRepo *pRepo = repo; STsdbRepo *pRepo = repo;
...@@ -574,6 +575,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -574,6 +575,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->state = TSDB_STATE_OK; pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->compactState = 0; pRepo->compactState = 0;
pRepo->deleteState = 0;
pRepo->config = *pCfg; pRepo->config = *pCfg;
if (pAppH) { if (pAppH) {
pRepo->appH = *pAppH; pRepo->appH = *pAppH;
...@@ -811,8 +813,7 @@ out: ...@@ -811,8 +813,7 @@ out:
return err; return err;
} }
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx, bool onlyKey) {
ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) {
return -1; return -1;
} }
...@@ -841,17 +842,24 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, ...@@ -841,17 +842,24 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
} }
TSKEY lastKey = memRowKey(lastRow); TSKEY lastKey = memRowKey(lastRow);
// during the load data in file, new data would be inserted and last row has been updated // during the load data in file, new data would be inserted and last row has been updated
TSDB_WLOCK_TABLE(pTable); TSDB_WLOCK_TABLE(pTable);
if (pTable->lastRow == NULL) {
pTable->lastKey = lastKey; pTable->lastKey = lastKey;
pTable->lastRow = lastRow; if (!onlyKey) {
TSDB_WUNLOCK_TABLE(pTable); // set
if (pTable->lastRow) {
SMemRow* p = pTable->lastRow;
pTable->lastRow = lastRow;
taosTZfree(p);
} else {
pTable->lastRow = lastRow;
}
} else { } else {
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(lastRow); taosTZfree(lastRow);
} }
TSDB_WUNLOCK_TABLE(pTable);
return 0; return 0;
} }
...@@ -905,7 +913,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -905,7 +913,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
if (pIdx && lastKey < pIdx->maxKey) { if (pIdx && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey; pTable->lastKey = pIdx->maxKey;
if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, false) != 0) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
return -1; return -1;
} }
...@@ -930,7 +938,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -930,7 +938,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
return 0; return 0;
} }
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable, bool force) {
SFSIter fsiter; SFSIter fsiter;
SReadH readh; SReadH readh;
SDFileSet *pSet; SDFileSet *pSet;
...@@ -939,6 +947,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { ...@@ -939,6 +947,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config)); bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config)); bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
bool onlyKey = !cacheLastRow;
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol); tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol);
...@@ -948,19 +957,24 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { ...@@ -948,19 +957,24 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
taosTZfree(pTable->lastRow); taosTZfree(pTable->lastRow);
pTable->lastRow = NULL; pTable->lastRow = NULL;
} }
if (!cacheLastCol && pTable->lastCols != NULL) { if ((!cacheLastCol && pTable->lastCols != NULL) || force) {
tsdbFreeLastColumns(pTable); tsdbFreeLastColumns(pTable);
} }
if (!cacheLastRow && !cacheLastCol) { if (!cacheLastRow && !cacheLastCol && !force) {
return 0; return 0;
} }
cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0; cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0;
cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0; cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0;
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) { if(force && cacheLastRowTableNum == 0) {
return 0; // if force update , must set 1
cacheLastRowTableNum = 1;
}
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0 && !force) {
return 0;
} }
if (tsdbInitReadH(&readh, pRepo) < 0) { if (tsdbInitReadH(&readh, pRepo) < 0) {
...@@ -993,8 +1007,8 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { ...@@ -993,8 +1007,8 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
SBlockIdx *pIdx = readh.pBlkIdx; SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) { if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL || force)) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, onlyKey) != 0) {
tsdbUnLockFS(REPO_FS(pRepo)); tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
return -1; return -1;
...@@ -1015,6 +1029,13 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { ...@@ -1015,6 +1029,13 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
} }
} }
if ( cacheLastRowTableNum > 0 && readh.pBlkIdx == NULL) {
// table no data, so reset lastKey
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = TSKEY_INITIAL_VAL;
TSDB_WUNLOCK_TABLE(pTable);
}
tsdbUnLockFS(REPO_FS(pRepo)); tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
...@@ -1113,7 +1134,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -1113,7 +1134,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
pTable->lastKey = pIdx->maxKey; pTable->lastKey = pIdx->maxKey;
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, false) != 0) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
return -1; return -1;
} }
......
此差异已折叠。
...@@ -639,7 +639,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) { ...@@ -639,7 +639,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
initTableMemIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
} }
code = tsdbLoadLastCache(pRepo, pTable); code = tsdbLoadLastCache(pRepo, pTable, false);
if (code != 0) { if (code != 0) {
tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid, tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid,
pTable->tableId.tid, tstrerror(terrno)); pTable->tableId.tid, tstrerror(terrno));
...@@ -4691,3 +4691,9 @@ void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callba ...@@ -4691,3 +4691,9 @@ void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callba
pQueryHandle->param = param; pQueryHandle->param = param;
return ; return ;
} }
// get table tid
int32_t tsdbTableTid(void* pTable) {
STable *p = (STable *)pTable;
return p->tableId.tid;
}
\ No newline at end of file
...@@ -682,7 +682,6 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -682,7 +682,6 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
uint32_t toffset = TSDB_KEY_COL_OFFSET; uint32_t toffset = TSDB_KEY_COL_OFFSET;
int32_t tlen = pBlock->keyLen; int32_t tlen = pBlock->keyLen;
if (dcol != 0) { if (dcol != 0) {
tsdbGetSBlockCol(pBlock, &pBlockCol, pBlockData->cols, ccol); tsdbGetSBlockCol(pBlock, &pBlockCol, pBlockData->cols, ccol);
tcolId = pBlockCol->colId; tcolId = pBlockCol->colId;
......
...@@ -74,12 +74,12 @@ static int insertData(SInsertInfo *pInfo) { ...@@ -74,12 +74,12 @@ static int insertData(SInsertInfo *pInfo) {
pBlock->tid = htonl(pBlock->tid); pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion); pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding); pBlock->flag = htonl(pBlock->flag);
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) { if (tsdbInsertData(pInfo->pRepo, pMsg, NULL, NULL) < 0) {
tfree(pMsg); tfree(pMsg);
return -1; return -1;
} }
......
...@@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended") ...@@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state")
TAOS_DEFINE_ERROR(TSDB_CODE_WAIT_THREAD_TOO_MANY, "Wait threads too many")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
...@@ -228,7 +228,8 @@ static SKeyword keywordTable[] = { ...@@ -228,7 +228,8 @@ static SKeyword keywordTable[] = {
{"RANGE", TK_RANGE}, {"RANGE", TK_RANGE},
{"CONTAINS", TK_CONTAINS}, {"CONTAINS", TK_CONTAINS},
{"TO", TK_TO}, {"TO", TK_TO},
{"SPLIT", TK_SPLIT} {"SPLIT", TK_SPLIT},
{"DELETE", TK_DELETE}
}; };
static const char isIdChar[] = { static const char isIdChar[] = {
......
...@@ -75,6 +75,9 @@ typedef struct { ...@@ -75,6 +75,9 @@ typedef struct {
tsem_t sem; tsem_t sem;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex; pthread_mutex_t statusMutex;
// thread for wait deal result to response client
SList * waitThreads;
tsem_t semWait;
} SVnodeObj; } SVnodeObj;
#ifdef __cplusplus #ifdef __cplusplus
......
此差异已折叠。
...@@ -63,6 +63,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -63,6 +63,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
SRspRet *pRspRet = NULL; SRspRet *pRspRet = NULL;
if (pWrite != NULL) pRspRet = &pWrite->rspRet; if (pWrite != NULL) pRspRet = &pWrite->rspRet;
// if wal and forward write , no need response
if( qtype == TAOS_QTYPE_WAL || qtype == TAOS_QTYPE_FWD) {
pRspRet = NULL;
}
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId, vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,
...@@ -107,7 +111,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -107,7 +111,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
} }
if (code < 0) { if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); if (syncCode > 0 && pWrite) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code); vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
pHead->version = 0; pHead->version = 0;
return code; return code;
...@@ -118,7 +122,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -118,7 +122,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, pRspRet);
if (code < 0) { if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1); if (syncCode > 0 && pWrite) atomic_sub_fetch_32(&pWrite->processedCount, 1);
return code; return code;
} }
...@@ -163,13 +167,15 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -163,13 +167,15 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
// save insert result into item // save insert result into item
SShellSubmitRspMsg *pRsp = NULL; SShellSubmitRspMsg *pRsp = NULL;
tsem_t** ppsem = NULL;
if (pRet) { if (pRet) {
pRet->len = sizeof(SShellSubmitRspMsg); pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len); pRet->rsp = rpcMallocCont(pRet->len);
pRsp = pRet->rsp; pRsp = pRet->rsp;
ppsem = &pRet->psem;
} }
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) { if (tsdbInsertData(pVnode->tsdb, pCont, pRsp, ppsem) < 0) {
code = terrno; code = terrno;
} else { } else {
if (pRsp != NULL) atomic_fetch_add_64(&tsSubmitReqSucNum, 1); if (pRsp != NULL) atomic_fetch_add_64(&tsSubmitReqSucNum, 1);
......
...@@ -189,9 +189,12 @@ python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanosub ...@@ -189,9 +189,12 @@ python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanosub
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestInsertTime_step.py python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestInsertTime_step.py
python3 test.py -f tools/taosdumpTestNanoSupport.py python3 test.py -f tools/taosdumpTestNanoSupport.py
# # tsdb
python3 ./test.py -f tsdb/insert.py
# python3 ./test.py -f tsdb/tsdbComp.py # python3 ./test.py -f tsdb/tsdbComp.py
# update # update
python3 ./test.py -f update/allow_update.py python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py python3 ./test.py -f update/allow_update-0.py
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册