提交 eeba8d02 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): init delete function

上级 b8ae42ff
......@@ -1153,7 +1153,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
code = validateTableName(tableToken.z, tableToken.n, &sTblToken, &dbIncluded2);
if (code != TSDB_CODE_SUCCESS) {
return tscInvalidOperationMsg(pInsertParam->msg, "invalid table name", *sqlstr);
return tscInvalidOperationMsg(pInsertParam->msg, STR_INVALID_TABLE_NAME, *sqlstr);
}
int32_t ret = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded2);
......@@ -1441,7 +1441,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
bool dbIncluded = false;
// Check if the table name available or not
if (validateTableName(sToken.z, sToken.n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
code = tscInvalidOperationMsg(pInsertParam->msg, "table name invalid", sToken.z);
code = tscInvalidOperationMsg(pInsertParam->msg, STR_INVALID_TABLE_NAME, sToken.z);
goto _clean;
}
......
......@@ -100,7 +100,7 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery);
static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery);
static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery, bool delData);
static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t validateRangeNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema);
......@@ -560,7 +560,7 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pInfo == NULL || pSql == NULL) {
return TSDB_CODE_TSC_APP_ERROR;
}
......@@ -759,8 +759,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
case TSDB_SQL_DESCRIBE_TABLE: {
const char* msg1 = "invalid table name";
SStrToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
bool dbIncluded = false;
char buf[TSDB_TABLE_FNAME_LEN];
......@@ -768,7 +766,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
sTblToken.z = buf;
if (validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
// additional msg has been attached already
......@@ -781,8 +779,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
case TSDB_SQL_SHOW_CREATE_STABLE:
case TSDB_SQL_SHOW_CREATE_TABLE: {
const char* msg1 = "invalid table name";
SStrToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
bool dbIncluded = false;
......@@ -791,7 +787,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
sTblToken.z = buf;
if (validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded);
......@@ -1050,6 +1046,39 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
break;
}
case TSDB_SQL_DELETE_DATA: {
// CHECK AND SET TABLE NAME
SStrToken* tbName = &pInfo->pDelData->tableName;
bool dbIncluded = false;
char buf[TSDB_TABLE_FNAME_LEN];
SStrToken sTblToken;
sTblToken.z = buf;
// check
if (validateTableName(tbName->z, tbName->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
// set
code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// get table meta
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code ; // async load table meta
}
// CHECK AND SET WHERE
if (pInfo->pDelData->pWhere) {
// origin check
pQueryInfo = tscGetQueryInfo(pCmd);
if (validateWhereNode(pQueryInfo, &pInfo->pDelData->pWhere, pSql, false, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
break;
}
default:
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
}
......@@ -4801,7 +4830,7 @@ static int32_t getColQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlEx
};
if (pQueryInfo->colCond == NULL) {
pQueryInfo->colCond = taosArrayInit(2, sizeof(SCond));
pQueryInfo->colCond = taosArrayInit(2, sizeof(STblCond));
}
taosArrayPush(pQueryInfo->colCond, &cond);
......@@ -5577,12 +5606,13 @@ void convertWhereStringCharset(tSqlExpr* pRight){
static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr,
tSqlExpr** tsExpr, bool joinQuery) {
tSqlExpr** tsExpr, bool joinQuery, bool delData) {
const char* msg1 = "table query cannot use tags filter";
const char* msg2 = "illegal column name";
const char* msg4 = "too many join tables";
const char* msg5 = "not support ordinary column join";
const char* msg6 = "illegal condition expression";
const char* msg7 = "only allow first timestamp column and tag column";
tSqlExpr* pLeft = (*pExpr)->pLeft;
tSqlExpr* pRight = (*pExpr)->pRight;
......@@ -5607,6 +5637,16 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
// delete where condition check , column must ts or tag
if (delData) {
if (!((pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) ||
index.columnIndex >= tscGetNumOfColumns(pTableMeta) ||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
}
// validate the null expression
int32_t code = validateNullExpr(*pExpr, pTableMeta, index.columnIndex, tscGetErrorMsgPayload(pCmd));
......@@ -5626,8 +5666,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
return code;
}
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_NCHAR){
convertWhereStringCharset(pRight);
}
......@@ -5776,7 +5814,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr,
tSqlExpr** tsExpr, bool joinQuery) {
tSqlExpr** tsExpr, bool joinQuery, bool delData) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
......@@ -5808,12 +5846,12 @@ int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr
int32_t rightTbIdx = 0;
if (!tSqlExprIsParentOfLeaf(*pExpr)) {
ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pLeft, pCondExpr, type ? &leftType : NULL, &leftTbIdx, (*pExpr)->tokenId, &columnLeft, &tsLeft, joinQuery);
ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pLeft, pCondExpr, type ? &leftType : NULL, &leftTbIdx, (*pExpr)->tokenId, &columnLeft, &tsLeft, joinQuery, delData);
if (ret != TSDB_CODE_SUCCESS) {
goto err_ret;
}
ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pRight, pCondExpr, type ? &rightType : NULL, &rightTbIdx, (*pExpr)->tokenId, &columnRight, &tsRight, joinQuery);
ret = getQueryCondExpr(pCmd, pQueryInfo, &(*pExpr)->pRight, pCondExpr, type ? &rightType : NULL, &rightTbIdx, (*pExpr)->tokenId, &columnRight, &tsRight, joinQuery, delData);
if (ret != TSDB_CODE_SUCCESS) {
goto err_ret;
}
......@@ -5868,7 +5906,7 @@ int32_t getQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr
goto err_ret;
}
ret = handleExprInQueryCond(pCmd, pQueryInfo, pExpr, pCondExpr, type, tbIdx, parentOptr, columnExpr, tsExpr, joinQuery);
ret = handleExprInQueryCond(pCmd, pQueryInfo, pExpr, pCondExpr, type, tbIdx, parentOptr, columnExpr, tsExpr, joinQuery, delData);
if (ret) {
goto err_ret;
}
......@@ -6415,7 +6453,7 @@ _ret:
int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery) {
int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql, bool joinQuery, bool delData) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
......@@ -6443,7 +6481,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
}
#endif
if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, etype, &tbIdx, (*pExpr)->tokenId, &condExpr.pColumnCond, &condExpr.pTimewindow, joinQuery)) != TSDB_CODE_SUCCESS) {
if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, etype, &tbIdx, (*pExpr)->tokenId, &condExpr.pColumnCond, &condExpr.pTimewindow, joinQuery, delData)) != TSDB_CODE_SUCCESS) {
goto PARSE_WHERE_EXIT;
}
......@@ -8928,7 +8966,6 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
}
int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow";
const char* msg5 = "tags number not matched";
......@@ -8964,7 +9001,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t code = validateTableName(pToken->z, pToken->n, &sTblToken, &dbIncluded);
if (code != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
code = tscSetTableFullName(&pStableMetaInfo->name, &sTblToken, pSql, dbIncluded);
......@@ -9171,7 +9208,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
bool dbIncluded2 = false;
// table name
if (tscValidateName(&(pCreateTableInfo->name), true, &dbIncluded2) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
......@@ -9183,7 +9220,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
pCreateTableInfo->fullname = calloc(1, tNameLen(&pTableMetaInfo->name) + 1);
ret = tNameExtractFullName(&pTableMetaInfo->name, pCreateTableInfo->fullname);
if (ret != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
}
......@@ -9191,7 +9228,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
}
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg2 = "functions not allowed in CQ";
const char* msg3 = "fill only available for interval query";
const char* msg4 = "fill option not supported in stream computing";
......@@ -9215,14 +9251,14 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
bool dbIncluded1 = false;
if (tscValidateName(pName, true, &dbIncluded1) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
// check to valid and create to name
if(pInfo->pCreateTableInfo->to.n > 0) {
bool dbInclude = false;
if (tscValidateName(&pInfo->pCreateTableInfo->to, false, &dbInclude) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
int32_t code = tscSetTableFullName(&pInfo->pCreateTableInfo->toSName, &pInfo->pCreateTableInfo->to, pSql, dbInclude);
if(code != TSDB_CODE_SUCCESS) {
......@@ -9249,7 +9285,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t code = validateTableName(srcToken.z, srcToken.n, &sTblToken, &dbIncluded2);
if (code != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql, dbIncluded2);
......@@ -9269,7 +9305,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
if (pSqlNode->pWhere != NULL) { // query condition in stream computing
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery) != TSDB_CODE_SUCCESS) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
......@@ -9681,7 +9717,6 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p
}
static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, char* msgBuf, SSqlObj* pSql) {
const char* msg1 = "invalid table name";
int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list);
assert(pSqlNode->from->type == SQL_NODE_FROM_TABLELIST);
......@@ -9691,7 +9726,7 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList
SStrToken* t = &item->tableName;
if (t->type == TK_INTEGER || t->type == TK_FLOAT) {
return invalidOperationMsg(msgBuf, msg1);
return invalidOperationMsg(msgBuf, STR_INVALID_TABLE_NAME);
}
bool dbIncluded = false;
......@@ -9700,7 +9735,7 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList
sTblToken.z = buf;
if (validateTableName(t->z, t->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(msgBuf, msg1);
return invalidOperationMsg(msgBuf, STR_INVALID_TABLE_NAME);
}
SName name = {0};
......@@ -9959,7 +9994,6 @@ _end:
}
static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, int32_t numOfTables) {
const char* msg1 = "invalid table name";
const char* msg2 = "invalid table alias name";
const char* msg3 = "alias name too long";
const char* msg4 = "self join not allowed";
......@@ -9980,7 +10014,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
SStrToken *oriName = &item->tableName;
if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
bool dbIncluded = false;
......@@ -9989,7 +10023,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
sTblToken.z = buf;
if (validateTableName(oriName->z, oriName->n, &sTblToken, &dbIncluded) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), STR_INVALID_TABLE_NAME);
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
......@@ -10258,7 +10292,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// validate the query filter condition info
if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery) != TSDB_CODE_SUCCESS) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
} else {
......@@ -10363,7 +10397,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->onlyHasTagCond = true;
// set where info
if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery) != TSDB_CODE_SUCCESS) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql, joinQuery, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......
......@@ -3313,6 +3313,77 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
return code;
}
// Super Table
int buildSTableDelDataMsg(SSqlObj *pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SSqlInfo *pInfo) {
return 0;
}
// Normal Child Table
int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SSqlInfo *pInfo) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pSql->cmd.payloadLen is set during copying data into payload
pCmd->msgType = TSDB_MSG_TYPE_SUBMIT;
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
// set payload
size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData);
int32_t ret = tscAllocPayload(pCmd, payloadLen);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
pCmd->payloadLen = payloadLen;
char* p = pCmd->payload;
SMsgDesc* pMsgDesc = (SMsgDesc* )p;
p += sizeof(SMsgDesc);
SSubmitMsg* pSubmitMsg = (SSubmitMsg* )p;
p += sizeof(SSubmitMsg);
SSubmitBlk* pSubmitBlk = (SSubmitBlk*)p;
p += sizeof(SSubmitBlk);
SControlData* pControlData = (SControlData* )p;
// SMsgDesc
pMsgDesc->numOfVnodes = htonl(1);
// SSubmitMsg
int32_t size = pCmd->payloadLen - sizeof(SMsgDesc);
pSubmitMsg->header.vgId = htonl(pTableMeta->vgId);
pSubmitMsg->header.contLen = htonl(size);
pSubmitMsg->length = pSubmitMsg->header.contLen;
pSubmitMsg->numOfBlocks = htonl(1);
// SSubmitBlk
pSubmitBlk->flag = FLAG_BLK_CONTROL; // this is control block
pSubmitBlk->tid = htonl(pTableMeta->id.tid);
pSubmitBlk->uid = htobe64(pTableMeta->id.uid);
pSubmitBlk->numOfRows = htons(1);
pSubmitBlk->schemaLen = 0; // only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached
pSubmitBlk->sversion = htonl(pTableMeta->sversion);
pSubmitBlk->dataLen = htonl(sizeof(SControlData));
// SControlData
pControlData->command = htonl(CMD_DELETE_DATA);
pControlData->win.skey = htobe64(pQueryInfo->window.skey);
pControlData->win.ekey = htobe64(pQueryInfo->window.ekey);
return TSDB_CODE_SUCCESS;
}
int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if(UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return buildSTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
} else {
return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
}
}
void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
......@@ -3352,6 +3423,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_DELETE_DATA] = tscBuildDelDataMsg;
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
......
......@@ -35,6 +35,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DELETE_DATA, "delete-data" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
// the SQL below is for mgmt node
......
......@@ -141,4 +141,7 @@ int32_t tNameSetAcctId(SName* dst, const char* acct);
int32_t tNameSetDbName(SName* dst, const char* acct, SStrToken* dbToken);
// define uniform string
#define STR_INVALID_TABLE_NAME "invalid table name"
#endif // TDENGINE_NAME_H
......@@ -513,7 +513,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pBlk->tid = htonl(pObj->tid);
pBlk->numOfRows = htons(1);
pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0;
pBlk->flag = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow);
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "tqueue.h"
#include "dnodeVWrite.h"
#include "tthread.h"
typedef struct {
taos_qall qall;
......@@ -161,6 +162,29 @@ void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(pWqueue);
}
void* waitingResultThread(void* param) {
SVWriteMsg* pWrite = (SVWriteMsg* )param;
int32_t ret = sem_wait(pWrite->rspRet.psem_rsp);
if(ret == 0) {
// success
}
sem_destroy(pWrite->rspRet.psem_rsp);
// wait ok
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
};
rpcSendResponse(&rpcRsp);
// remove from thread manager
vnodeRemoveWait(pWrite->pVnode, pWrite);
vnodeFreeFromWQueue(pWrite->pVnode, pWrite);
return NULL;
}
void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (wparam == NULL) return;
SVWriteMsg *pWrite = wparam;
......@@ -170,15 +194,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (count <= 1) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
};
if(pWrite->rspRet.psem_rsp == 0) {
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->code,
};
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
rpcSendResponse(&rpcRsp);
vnodeFreeFromWQueue(pVnode, pWrite);
} else {
// need async to wait result in another thread
pthread_t* thread = taosCreateThread(waitingResultThread, pWrite);
// add to wait thread manager
vnodeAddWait(pVnode, thread, pWrite->rspRet.psem_rsp, pWrite);
}
}
static void *dnodeProcessVWriteQueue(void *wparam) {
......
......@@ -120,6 +120,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
// delete
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105
#endif
......@@ -195,6 +199,9 @@ enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
// SSubmitBlk->flag define
#define FLAG_BLK_CONTROL 0x00000001 // SSubmitBlk is a control block to submit
#define IS_CONTROL_BLOCK(x) (x->flag & FLAG_BLK_CONTROL)
extern char *taosMsg[];
......@@ -219,7 +226,7 @@ typedef struct SMsgHead {
typedef struct SSubmitBlk {
uint64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t flag; // extend special information, can see FLAG_BLK_??? define
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
......@@ -996,6 +1003,12 @@ typedef struct {
char value[];
} STLV;
#define CMD_DELETE_DATA 0x00000001
typedef struct SControlData{
uint32_t command; // see define CMD_???
STimeWindow win;
} SControlData;
enum {
TLV_TYPE_END_MARK = -1,
//TLV_TYPE_DUMMY = 1,
......
......@@ -160,7 +160,7 @@ typedef struct {
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, sem_t* pSem);
// -- FOR QUERY TIME SERIES DATA
......
......@@ -177,48 +177,49 @@
#define TK_CONNECTION 159
#define TK_STREAM 160
#define TK_COLON 161
#define TK_ABORT 162
#define TK_AFTER 163
#define TK_ATTACH 164
#define TK_BEFORE 165
#define TK_BEGIN 166
#define TK_CASCADE 167
#define TK_CLUSTER 168
#define TK_CONFLICT 169
#define TK_COPY 170
#define TK_DEFERRED 171
#define TK_DELIMITERS 172
#define TK_DETACH 173
#define TK_EACH 174
#define TK_END 175
#define TK_EXPLAIN 176
#define TK_FAIL 177
#define TK_FOR 178
#define TK_IGNORE 179
#define TK_IMMEDIATE 180
#define TK_INITIALLY 181
#define TK_INSTEAD 182
#define TK_KEY 183
#define TK_OF 184
#define TK_RAISE 185
#define TK_REPLACE 186
#define TK_RESTRICT 187
#define TK_ROW 188
#define TK_STATEMENT 189
#define TK_TRIGGER 190
#define TK_VIEW 191
#define TK_IPTOKEN 192
#define TK_SEMI 193
#define TK_NONE 194
#define TK_PREV 195
#define TK_LINEAR 196
#define TK_IMPORT 197
#define TK_TBNAME 198
#define TK_JOIN 199
#define TK_INSERT 200
#define TK_INTO 201
#define TK_VALUES 202
#define TK_FILE 203
#define TK_DELETE 162
#define TK_ABORT 163
#define TK_AFTER 164
#define TK_ATTACH 165
#define TK_BEFORE 166
#define TK_BEGIN 167
#define TK_CASCADE 168
#define TK_CLUSTER 169
#define TK_CONFLICT 170
#define TK_COPY 171
#define TK_DEFERRED 172
#define TK_DELIMITERS 173
#define TK_DETACH 174
#define TK_EACH 175
#define TK_END 176
#define TK_EXPLAIN 177
#define TK_FAIL 178
#define TK_FOR 179
#define TK_IGNORE 180
#define TK_IMMEDIATE 181
#define TK_INITIALLY 182
#define TK_INSTEAD 183
#define TK_KEY 184
#define TK_OF 185
#define TK_RAISE 186
#define TK_REPLACE 187
#define TK_RESTRICT 188
#define TK_ROW 189
#define TK_STATEMENT 190
#define TK_TRIGGER 191
#define TK_VIEW 192
#define TK_IPTOKEN 193
#define TK_SEMI 194
#define TK_NONE 195
#define TK_PREV 196
#define TK_LINEAR 197
#define TK_IMPORT 198
#define TK_TBNAME 199
#define TK_JOIN 200
#define TK_INSERT 201
#define TK_INTO 202
#define TK_VALUES 203
#define TK_FILE 204
#define TK_SPACE 300
......
......@@ -32,6 +32,7 @@ typedef struct {
int32_t len;
void * rsp;
void * qhandle; // used by query and retrieve msg
sem_t* psem_rsp; // if it is not zero, need wait result with async
} SRspRet;
typedef struct {
......@@ -58,6 +59,13 @@ typedef struct {
SWalHead walHead;
} SVWriteMsg;
typedef struct {
int32_t startTime;
pthread_t* pthread;
sem_t* psem;
void* param;
} SWaitThread;
// vnodeStatus
extern char *vnodeStatus[];
......@@ -98,6 +106,10 @@ int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qt
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
// wait thread
void vnodeAddWait(void* pVnode, pthread_t* pthread, sem_t* psem, void* param);
void vnodeRemoveWait(void* pVnode, void* param);
#ifdef __cplusplus
}
#endif
......
......@@ -252,6 +252,12 @@ typedef struct SMiscInfo {
};
} SMiscInfo;
typedef struct SDelData {
bool existsCheck;
SStrToken tableName;
struct tSqlExpr* pWhere;
} SDelData;
typedef struct SSqlInfo {
int32_t type;
bool valid;
......@@ -262,6 +268,7 @@ typedef struct SSqlInfo {
SCreateTableSql *pCreateTableInfo;
SAlterTableInfo *pAlterInfo;
SMiscInfo *pMiscInfo;
SDelData *pDelData;
};
} SSqlInfo;
......@@ -364,6 +371,9 @@ void tSetDbName(SStrToken *pCpxName, SStrToken *pDb);
void tSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType);
void tSetColumnType(TAOS_FIELD *pField, SStrToken *type);
// malloc new SDelData and set with args
SDelData *tGetDelData(SStrToken *pTableName, SStrToken* existsCheck, tSqlExpr* pWhere);
/**
*
* @param yyp The parser
......
......@@ -984,7 +984,16 @@ cmd ::= KILL CONNECTION INTEGER(Y). {setKillSql(pInfo, TSDB_SQL_KILL_CONNECTIO
cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_QUERY, &X);}
//////////////////////////////////// DEL TRUNCATE TABLE /////////////////////////////////////
//1 DELETE FROM TBNAME/STBNAME WHERE TS AND TAG CONDICTION
cmd ::= DELETE FROM ifexists(Y) ids(X) cpxName(Z) where_opt(W). {
X.n += Z.n;
SDelData * pDelData = tGetDelData(&X, &Y, W);
setSqlInfo(pInfo, pDelData, NULL, TSDB_SQL_DELETE_DATA);
}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
LIKE MATCH NMATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
NOW TODAY IPTOKEN SEMI NONE PREV LINEAR IMPORT TBNAME JOIN STABLE NULL INSERT INTO VALUES FILE.
NOW TODAY IPTOKEN SEMI NONE PREV LINEAR IMPORT TBNAME JOIN STABLE NULL INSERT INTO VALUES FILE.
\ No newline at end of file
......@@ -1310,6 +1310,22 @@ void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrTo
pInfo->pMiscInfo->tableType = tableType;
}
void setTruncateTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck, int16_t dbType, int16_t tableType) {
pInfo->type = type;
if (pInfo->pMiscInfo == NULL) {
pInfo->pMiscInfo = (SMiscInfo *)calloc(1, sizeof(SMiscInfo));
pInfo->pMiscInfo->a = taosArrayInit(4, sizeof(SStrToken));
}
taosArrayPush(pInfo->pMiscInfo->a, pToken);
pInfo->pMiscInfo->existsCheck = (existsCheck->n == 1);
pInfo->pMiscInfo->dbType = dbType;
pInfo->pMiscInfo->tableType = tableType;
}
void setDropFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken) {
pInfo->type = type;
......@@ -1474,3 +1490,15 @@ void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo) {
pDBInfo->dbType = TSDB_DB_TYPE_TOPIC;
pDBInfo->partitions = TSDB_DEFAULT_DB_PARTITON_OPTION;
}
// malloc new SDelData and set with args
SDelData* tGetDelData(SStrToken* pTableName, SStrToken* existsCheck, tSqlExpr* pWhere) {
// malloc
SDelData* pDelData = (SDelData *) calloc(1, sizeof(SDelData));
// set value
pDelData->existsCheck = (existsCheck->n == 1);
pDelData->tableName = *pTableName;
pDelData->pWhere = pWhere;
return pDelData;
}
\ No newline at end of file
此差异已折叠。
......@@ -49,15 +49,17 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIte
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
static int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, sem_t* pSem);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now);
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, sem_t* pSem) {
STsdbRepo * pRepo = repo;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
int32_t affectedrows = 0, numOfRows = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
......@@ -70,8 +72,16 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
return -1;
if (IS_CONTROL_BLOCK(pBlock)) {
// COMMAND DATA BLOCK
ret = tsdbInsertControlData(pRepo, pBlock, pRsp, pSem);
// all control msg is one SSubmitMsg, so need return
return ret;
} else {
// INSERT DATA BLOCK
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
return -1;
}
}
numOfRows += pBlock->numOfRows;
}
......@@ -82,7 +92,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
}
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0;
return ret;
}
// ---------------- INTERNAL FUNCTIONS ----------------
......@@ -702,10 +712,13 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
return -1;
// check each row time invalid if not control block
if (!IS_CONTROL_BLOCK(pBlock)) {
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
return -1;
}
}
}
}
......@@ -1082,3 +1095,39 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
return 0;
}
// Delete Data
int32_t tsdbInsertDeleteData(STsdbRepo* pRepo, SControlData* pCtlData, SShellSubmitRspMsg *pRsp, sem_t* pSem) {
pRsp->affectedRows = htonl(99);
// INIT SEM
int32_t ret = sem_init(pSem, 0, 0);
if(ret != 0) {
return TAOS_SYSTEM_ERROR(ret);
}
// CREATE DELETE MEMTABLE
// FORCE COMMIT ALL MEM AND IMEM
return 0;
}
// Control Data
int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, sem_t* pSem) {
int32_t ret = TSDB_CODE_SUCCESS;
assert(pBlock->dataLen == sizeof(SControlData));
SControlData* pCtlData = (SControlData* )pBlock->data;
// anti-serialize
pCtlData->command = htonl(pCtlData->command);
pCtlData->win.skey = htobe64(pCtlData->win.skey);
pCtlData->win.ekey = htobe64(pCtlData->win.ekey);
if(pCtlData->command == CMD_DELETE_DATA) {
ret = tsdbInsertDeleteData(pRepo, pCtlData, pRsp, pSem);
}
return ret;
}
\ No newline at end of file
......@@ -400,8 +400,8 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
static void tsdbDestroyTruncateH(STruncateH *pTruncateH) {
pTruncateH->pDCols = tdFreeDataCols(pTruncateH->pDCols);
pTruncateH->aSupBlk = taosArrayDestroy(pTruncateH->aSupBlk);
pTruncateH->aBlkIdx = taosArrayDestroy(pTruncateH->aBlkIdx);
pTruncateH->aSupBlk = taosArrayDestroy(&pTruncateH->aSupBlk);
pTruncateH->aBlkIdx = taosArrayDestroy(&pTruncateH->aBlkIdx);
tsdbDestroyTruncateTblArray(pTruncateH);
tsdbDestroyReadH(&(pTruncateH->readh));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
......@@ -453,7 +453,7 @@ static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH) {
tfree(pTblHandle->pInfo);
}
pTruncateH->tblArray = taosArrayDestroy(pTruncateH->tblArray);
pTruncateH->tblArray = taosArrayDestroy(&pTruncateH->tblArray);
}
static int tsdbCacheFSetIndex(STruncateH *pTruncateH) {
......
......@@ -74,7 +74,7 @@ static int insertData(SInsertInfo *pInfo) {
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pBlock->flag = htonl(pBlock->flag);
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
......
......@@ -47,6 +47,8 @@ void *taosAllocateQitem(int size);
void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem);
// special type search Qitem
int taosSearchQitem(taos_queue, int type, void **pitem);
taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
......
......@@ -160,6 +160,47 @@ int taosReadQitem(taos_queue param, int *type, void **pitem) {
return code;
}
// search Qitem with type
int taosSearchQitem(taos_queue param, int type, void **pitem) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL;
STaosQnode *pPre = NULL;
int code = 0;
pthread_mutex_lock(&queue->mutex);
pNode = queue->head;
while (pNode) {
if(pNode->type == type) {
// found
*pitem = pNode->item;
if(pPre == NULL) {
queue->head = pNode->next;
} else {
pPre->next = pNode->next;
}
if (queue->head == NULL)
queue->tail = NULL;
// reduce number
queue->numOfItems--;
if (queue->qset) {
atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
}
code = 1;
uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, type, queue->numOfItems);
break;
}
// move next
pPre = pNode;
pNode = pNode->next;
}
pthread_mutex_unlock(&queue->mutex);
return code;
}
void *taosAllocateQall() {
void *p = calloc(sizeof(STaosQall), 1);
return p;
......
......@@ -228,7 +228,8 @@ static SKeyword keywordTable[] = {
{"RANGE", TK_RANGE},
{"CONTAINS", TK_CONTAINS},
{"TO", TK_TO},
{"SPLIT", TK_SPLIT}
{"SPLIT", TK_SPLIT},
{"DELETE", TK_DELETE}
};
static const char isIdChar[] = {
......
......@@ -75,6 +75,7 @@ typedef struct {
tsem_t sem;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex;
void * tqueue; // async threads queue
} SVnodeObj;
#ifdef __cplusplus
......
......@@ -29,6 +29,9 @@
#include "vnodeWorker.h"
#include "vnodeBackup.h"
#include "vnodeMain.h"
#include "tqueue.h"
#include "tthread.h"
#include "tcrc32c.h"
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
......@@ -465,6 +468,42 @@ int32_t vnodeOpen(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
#define LOOP_CNT 10
void freeWaitThread(SVnodeObj* pVnode) {
// check wait thread empty
int type = 0;
SWaitThread* pWaitThread = NULL;
while(taosReadQitem(pVnode->tqueue, &type, (void** )&pWaitThread) > 0) {
// thread is running
int32_t loop = LOOP_CNT;
while (taosThreadRunning(pWaitThread->pthread)) {
// only post once
if(loop == LOOP_CNT)
tsem_post(pWaitThread->psem);
taosMsleep(50);
loop -= 1;
if(loop == 0 )
break;
}
// free all
if(loop == 0) {
// thread not stop , so need kill
taosDestoryThread(pWaitThread->pthread);
// write msg need remove from queue
SVWriteMsg* pWrite = (SVWriteMsg* )pWaitThread->param;
if (pWrite)
vnodeFreeFromWQueue(pWrite->pVnode, pWrite);
} else {
free(pWaitThread->pthread);
}
tsem_destroy(pWaitThread->psem);
taosFreeQitem(pWaitThread);
}
taosCloseQueue(pVnode->tqueue);
}
int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireNotClose(vgId);
if (pVnode == NULL) return 0;
......@@ -475,6 +514,10 @@ int32_t vnodeClose(int32_t vgId) {
pVnode->preClose = 1;
// wait result threads need deal
if(pVnode->tqueue)
freeWaitThread(pVnode);
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
......@@ -614,3 +657,37 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return 0;
}
// wait thread
void vnodeAddWait(void* vparam, pthread_t* pthread, sem_t* psem, void* param) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
if(pVnode->tqueue == NULL) {
pVnode->tqueue = taosOpenQueue();
}
SWaitThread* pWaitThread = (SWaitThread* )taosAllocateQitem(sizeof(SWaitThread));
pWaitThread->pthread = pthread;
pWaitThread->startTime = taosGetTimestampSec();
pWaitThread->psem = psem;
pWaitThread->param = param;
int32_t crc = crc32c_sf(0, (crc_stream)pWaitThread, sizeof(void* ));
taosWriteQitem(pVnode->tqueue, crc, pWaitThread);
}
// called in wait thread
void vnodeRemoveWait(void* vparam, void* param) {
SVnodeObj* pVnode = (SVnodeObj* )vparam;
int32_t crc = crc32c_sf(0, (crc_stream)param, sizeof(void* ));
SWaitThread* pWaitThread = NULL;
taosSearchQitem(pVnode->tqueue, crc, (void** )&pWaitThread);
if (pWaitThread == NULL) {
// not found
return ;
}
// free thread
free(pWaitThread->pthread);
taosFreeQitem(pWaitThread);
}
\ No newline at end of file
......@@ -169,7 +169,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
pRsp = pRet->rsp;
}
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) {
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp, pRet->psem_rsp) < 0) {
code = terrno;
} else {
if (pRsp != NULL) atomic_fetch_add_64(&tsSubmitReqSucNum, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册