“02bb9b6334958d6050602e720c6d2cf436c6e9f1”上不存在“tests/pytest/functions/function_percentile2.py”
提交 0ee0abda 编写于 作者: K kailixu

Merge branch '2.6' into fix/TS-2670-2.6

......@@ -151,6 +151,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -406,6 +406,7 @@ typedef struct SSqlObj {
int64_t lastAlive;
void * pPrevContext;
bool enableBatch;
bool needUpdateMeta;
} SSqlObj;
typedef struct SSqlStream {
......
......@@ -1736,6 +1736,16 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
} else if (code != TSDB_CODE_SUCCESS) {
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->res.code && rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
goto _error;
}
......
......@@ -1179,7 +1179,7 @@ static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlL
sqlLen += retLen;
}
--sqlLen;
retLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ") values (");
retLen = snprintf(sql + sqlLen, freeBytes - sqlLen, ") values (");
if (retLen >= freeBytes - sqlLen) {
tscError("SML:0x%" PRIx64 " no free space for building sql", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -1168,20 +1168,11 @@ static int insertStmtReset(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
static int insertStmtExecute(STscStmt* stmt) {
static int insertStmtExecuteImpl(STscStmt* stmt, STableMetaInfo* pTableMetaInfo, bool schemaAttached) {
SSqlCmd* pCmd = &stmt->pSql->cmd;
if (pCmd->batchSize == 0) {
tscError("no records bind");
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind");
}
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) {
return TSDB_CODE_SUCCESS;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
stmt->pSql->cmd.insertParam.schemaAttached = schemaAttached ? 1 : 0;
if (pCmd->insertParam.pTableBlockHashList == NULL) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
......@@ -1198,6 +1189,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pBlk->dataLen = 0;
pBlk->uid = pTableMeta->id.uid;
pBlk->tid = pTableMeta->id.tid;
pBlk->sversion = pTableMeta->sversion;
fillTablesColumnsNull(stmt->pSql);
......@@ -1219,8 +1211,54 @@ static int insertStmtExecute(STscStmt* stmt) {
tscBuildAndSendRequest(pSql, NULL);
return TSDB_CODE_SUCCESS;
}
static int insertStmtExecute(STscStmt* stmt) {
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &stmt->pSql->cmd;
SSqlObj* pSql = stmt->pSql;
if (pCmd->batchSize == 0) {
tscError("no records bind");
return invalidOperationMsg(tscGetErrorMsgPayload(&stmt->pSql->cmd), "no records bind");
}
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) {
return code;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
code = insertStmtExecuteImpl(stmt, pTableMetaInfo, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
while (pSql->retry < pSql->maxRetry) {
if (pSql->res.code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->retry += 1;
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
code = insertStmtExecuteImpl(stmt, pTableMetaInfo, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
} else {
break;
}
}
}
stmt->numOfRows += pSql->res.numOfRows;
......@@ -1277,7 +1315,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set");
}
pStmt->pSql->retry = pStmt->pSql->maxRetry + 1; //no retry
pStmt->pSql->retry = 0; // enable retry in case of reconfiguring table meta
if (taosHashGetSize(pStmt->pSql->cmd.insertParam.pTableBlockHashList) <= 0) { // merge according to vgId
tscError("0x%"PRIx64" no data block to insert", pStmt->pSql->self);
......
......@@ -1823,7 +1823,6 @@ int32_t validateOneTag(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
int32_t validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg1 = "too many columns";
const char* msg3 = "column length too long";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid column name or length";
const char* msg6 = "invalid column length";
......@@ -1863,9 +1862,11 @@ int32_t validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
}
// length less than TSDB_MAX_BYTES_PER_ROW
if (nLen + pColField->bytes + (IS_VAR_DATA_TYPE(pColField->type) ? sizeof(VarDataOffsetT) : 0) >
TSDB_MAX_BYTES_PER_ROW) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
int32_t totalLength = nLen + pColField->bytes + (IS_VAR_DATA_TYPE(pColField->type) ? sizeof(VarDataOffsetT) : 0);
if (totalLength > TSDB_MAX_BYTES_PER_ROW) {
char errMsg[64];
sprintf(errMsg, "(%d > %d)", totalLength, TSDB_MAX_BYTES_PER_ROW);
return tscErrorMsgWithCode(TSDB_CODE_TSC_EXCEED_ROW_BYTES, tscGetErrorMsgPayload(pCmd), errMsg, NULL);
}
// field name must be unique
......
......@@ -532,6 +532,20 @@ bool shouldRewTableMeta(SSqlObj* pSql, SRpcMsg* rpcMsg) {
return true;
}
int tscHandleRenewTableMeta(SSqlObj *pSql) {
SSqlObj *rootObj = pSql->rootObj;
if (rootObj == pSql) {
return tscRenewTableMeta(pSql);
}
rootObj->res.code = pSql->res.code;
rootObj->needUpdateMeta = true;
return rootObj->res.code;
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
......@@ -611,7 +625,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
pSql->retryReason = rpcMsg->code;
rpcMsg->code = tscRenewTableMeta(pSql);
rpcMsg->code = tscHandleRenewTableMeta(pSql);
// if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
......@@ -3425,6 +3439,7 @@ int tscRenewTableMeta(SSqlObj *pSql) {
SSqlObj *rootSql = pSql->rootObj;
tscFreeSubobj(rootSql);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
rootSql->res.code = 0;
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(&pNameList, freeElem);
......
......@@ -2046,6 +2046,19 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
goto _return;
}
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
} else {
tscAsyncResultOnError(pParentSql);
}
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
goto _return;
}
......@@ -2598,6 +2611,16 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*) tres;
int32_t c = taos_errno(pSql);
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->res.code && rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
if (c != TSDB_CODE_SUCCESS) {
SSqlObj* parent = pSup->pParent;
......@@ -3105,6 +3128,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
tscFreeRetrieveSup(&pSql->param);
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
......@@ -3207,6 +3240,17 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return;
}
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
tscFreeRetrieveSup(&pSql->param);
return;
}
}
// all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
......@@ -3350,6 +3394,20 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
return;
}
SColumnModel *pModelDesc = pDesc->pColumnModel;
if (pModelDesc == NULL) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" column model has been freed", pParentSql->self, pSql->self);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_QRY_APP_ERROR);
return;
}
SColumnModel *pModelMemBuf = trsupport->pExtMemBuffer[idx]->pColumnModel;
if (pModelDesc->numOfCols != pModelMemBuf->numOfCols ||
pModelDesc->rowSize != pModelMemBuf->rowSize) {
tscError("0x%"PRIx64" sub:0x%"PRIx64 "extBuf column model is not consistent with descriptor column model", pParentSql->self, pSql->self);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_QRY_APP_ERROR);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret != 0) { // set no disk space error info, and abort retry
......@@ -3596,6 +3654,15 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
pParentObj->res.code = TSDB_CODE_SUCCESS;
if (TSDB_QUERY_HAS_TYPE(pParentObj->cmd.insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
tscDebug("0x%"PRIx64" re-try stmt with same submit data, retry:%d", pParentObj->self, pParentObj->retry);
pParentObj->retry++;
tscRestoreTableDataBlocks(&pParentObj->cmd.insertParam);
tscMergeTableDataBlocks(pParentObj, &pParentObj->cmd.insertParam, false);
tscHandleMultivnodeInsert(pParentObj);
return;
}
tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
......
......@@ -2211,6 +2211,21 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
int32_t tscRestoreTableDataBlocks(SInsertStatementParam *pInsertParam) {
STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
while (iter) {
STableDataBlocks* pOneTableBlock = *iter;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows);
iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter);
}
return TSDB_CODE_SUCCESS;
}
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
int code = 0;
......@@ -2320,8 +2335,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
// the length does not include the SSubmitBlk structure
pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1;
pBlocks->numOfRows = 0;
} else {
tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname);
}
......
......@@ -120,6 +120,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0228) //"invalid table schema version")
#define TSDB_CODE_TSC_TOO_MANY_SML_LINES TAOS_DEF_ERROR_CODE(0, 0x0229) //"too many lines in batch")
#define TSDB_CODE_TSC_SEND_DATA_FAILED TAOS_DEF_ERROR_CODE(0, 0x0230) //"Client send request data error"
#define TSDB_CODE_TSC_EXCEED_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x0231) //"Columns total length exceeds row bytes
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed"
......
Subproject commit d9ec91d5e0686911451422db60fa812eb46e58aa
Subproject commit d11f210c17a97b3edddabc05b776f37722c82bb5
......@@ -1254,6 +1254,7 @@ void tOrderDescDestroy(tOrderDescriptor *pDesc) {
}
destroyColumnModel(pDesc->pColumnModel);
pDesc->pColumnModel = NULL;
tfree(pDesc);
}
......
......@@ -127,6 +127,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_PRECISION_TYPE, "Invalid timestamp pre
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_RES_TOO_MANY, "Result set too large to be output")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_TOO_MANY_SML_LINES, "Too many lines in batch")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SEND_DATA_FAILED, "Client send request data failed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_ROW_BYTES, "Columns total length exceeds row bytes")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册