提交 7df53159 编写于 作者: J jurgen

Cassandra data update + value handlers.

Execution batch refactoring.
上级 27188b98
/*
* Copyright (C) 2010-2014 Serge Rieder
* serge@jkiss.org
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.jkiss.dbeaver.model.impl.data;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.model.DBUtils;
import org.jkiss.dbeaver.model.data.DBDDataReceiver;
import org.jkiss.dbeaver.model.data.DBDValueHandler;
import org.jkiss.dbeaver.model.exec.*;
import org.jkiss.dbeaver.model.struct.DBSAttributeBase;
import org.jkiss.dbeaver.model.struct.DBSDataManipulator;
import org.jkiss.utils.ArrayUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Execute batch.
* Can be used in JDBC or any other underlying DB APIs
*
*/
public class ExecuteBatchImpl implements DBSDataManipulator.ExecuteBatch {
static final Log log = LogFactory.getLog(ExecuteBatchImpl.class);
private DBCStatement statement;
private final DBSAttributeBase[] attributes;
private final List<Object[]> values = new ArrayList<Object[]>();
private final DBDDataReceiver keysReceiver;
private final boolean skipSequences;
public ExecuteBatchImpl(@NotNull DBCStatement statement, @NotNull DBSAttributeBase[] attributes, @Nullable DBDDataReceiver keysReceiver, boolean skipSequences)
{
this.statement = statement;
this.attributes = attributes;
this.keysReceiver = keysReceiver;
this.skipSequences = skipSequences;
}
@Override
public void add(@NotNull Object[] attributeValues) throws DBCException
{
if (!ArrayUtils.isEmpty(attributes) && ArrayUtils.isEmpty(attributeValues)) {
throw new DBCException("Bad attribute values: " + Arrays.toString(attributeValues));
}
values.add(attributeValues);
}
@NotNull
@Override
public DBCStatistics execute() throws DBCException
{
if (statement == null) {
throw new DBCException("Execute batch closed");
}
DBDValueHandler[] handlers = new DBDValueHandler[attributes.length];
for (int i = 0; i < attributes.length; i++) {
handlers[i] = DBUtils.findValueHandler(statement.getSession(), attributes[i]);
}
boolean useBatch = statement.getSession().getDataSource().getInfo().supportsBatchUpdates();
if (values.size() <= 1) {
useBatch = false;
}
DBCStatistics statistics = new DBCStatistics();
statistics.setQueryText(statement.getQueryString());
for (Object[] rowValues : values) {
int paramIndex = 0;
for (int k = 0; k < handlers.length; k++) {
DBDValueHandler handler = handlers[k];
DBSAttributeBase attribute = attributes[k];
if (skipSequences && (attribute.isPseudoAttribute() || attribute.isAutoGenerated())) {
continue;
}
handler.bindValueObject(statement.getSession(), statement, attribute, paramIndex++, rowValues[k]);
}
if (useBatch) {
statement.addToBatch();
} else {
// Execute each row separately
long startTime = System.currentTimeMillis();
statement.executeStatement();
statistics.addExecuteTime(System.currentTimeMillis() - startTime);
long rowCount = statement.getUpdateRowCount();
if (rowCount > 0) {
statistics.addRowsUpdated(rowCount);
}
// Read keys
if (keysReceiver != null) {
readKeys(statement.getSession(), statement, keysReceiver);
}
}
}
values.clear();
if (useBatch) {
// Process batch
long startTime = System.currentTimeMillis();
int[] updatedRows = statement.executeStatementBatch();
statistics.addExecuteTime(System.currentTimeMillis() - startTime);
if (!ArrayUtils.isEmpty(updatedRows)) {
for (int rows : updatedRows) {
statistics.addRowsUpdated(rows);
}
}
}
return statistics;
}
@Override
public void close()
{
statement.close();
statement = null;
}
private void readKeys(@NotNull DBCSession session, @NotNull DBCStatement dbStat, @NotNull DBDDataReceiver keysReceiver)
throws DBCException
{
DBCResultSet dbResult;
try {
dbResult = dbStat.openGeneratedKeysResultSet();
}
catch (Throwable e) {
log.debug("Error obtaining generated keys", e); //$NON-NLS-1$
return;
}
if (dbResult == null) {
return;
}
try {
keysReceiver.fetchStart(session, dbResult);
try {
while (dbResult.nextRow()) {
keysReceiver.fetchRow(session, dbResult);
}
}
finally {
keysReceiver.fetchEnd(session);
}
}
finally {
dbResult.close();
keysReceiver.close();
}
}
}
......@@ -30,6 +30,7 @@ import org.jkiss.dbeaver.model.DBUtils;
import org.jkiss.dbeaver.model.data.*;
import org.jkiss.dbeaver.model.exec.*;
import org.jkiss.dbeaver.model.impl.DBObjectNameCaseTransformer;
import org.jkiss.dbeaver.model.impl.data.ExecuteBatchImpl;
import org.jkiss.dbeaver.model.impl.jdbc.cache.JDBCStructCache;
import org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCColumnMetaData;
import org.jkiss.dbeaver.model.impl.struct.AbstractTable;
......@@ -44,8 +45,6 @@ import org.jkiss.dbeaver.model.struct.DBSObjectContainer;
import org.jkiss.utils.ArrayUtils;
import org.jkiss.utils.CommonUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
......@@ -328,7 +327,7 @@ public abstract class JDBCTable<DATASOURCE extends DBPDataSource, CONTAINER exte
DBCStatement dbStat = session.prepareStatement(DBCStatementType.QUERY, query.toString(), false, false, keysReceiver != null);
dbStat.setStatementSource(this);
return new BatchImpl(dbStat, attributes, keysReceiver, true);
return new ExecuteBatchImpl(dbStat, attributes, keysReceiver, true);
}
@NotNull
......@@ -378,7 +377,7 @@ public abstract class JDBCTable<DATASOURCE extends DBPDataSource, CONTAINER exte
DBSAttributeBase[] attributes = ArrayUtils.concatArrays(updateAttributes, keyAttributes);
return new BatchImpl(dbStat, attributes, keysReceiver, false);
return new ExecuteBatchImpl(dbStat, attributes, keysReceiver, false);
}
@NotNull
......@@ -412,7 +411,7 @@ public abstract class JDBCTable<DATASOURCE extends DBPDataSource, CONTAINER exte
// Execute
DBCStatement dbStat = session.prepareStatement(DBCStatementType.QUERY, query.toString(), false, false, false);
dbStat.setStatementSource(this);
return new BatchImpl(dbStat, keyAttributes, null, false);
return new ExecuteBatchImpl(dbStat, keyAttributes, null, false);
}
private String getAttributeName(@NotNull DBSAttributeBase attribute) {
......@@ -465,37 +464,6 @@ public abstract class JDBCTable<DATASOURCE extends DBPDataSource, CONTAINER exte
query.append("=?"); //$NON-NLS-1$
}
private void readKeys(@NotNull DBCSession session, @NotNull DBCStatement dbStat, @NotNull DBDDataReceiver keysReceiver)
throws DBCException
{
DBCResultSet dbResult;
try {
dbResult = dbStat.openGeneratedKeysResultSet();
}
catch (Throwable e) {
log.debug("Error obtaining generated keys", e); //$NON-NLS-1$
return;
}
if (dbResult == null) {
return;
}
try {
keysReceiver.fetchStart(session, dbResult);
try {
while (dbResult.nextRow()) {
keysReceiver.fetchRow(session, dbResult);
}
}
finally {
keysReceiver.fetchEnd(session);
}
}
finally {
dbResult.close();
keysReceiver.close();
}
}
/**
* Reads and caches metadata which is required for data requests
* @param monitor progress monitor
......@@ -512,103 +480,4 @@ public abstract class JDBCTable<DATASOURCE extends DBPDataSource, CONTAINER exte
}
}
private class BatchImpl implements ExecuteBatch {
private DBCStatement statement;
private final DBSAttributeBase[] attributes;
private final List<Object[]> values = new ArrayList<Object[]>();
private final DBDDataReceiver keysReceiver;
private final boolean skipSequences;
private BatchImpl(@NotNull DBCStatement statement, @NotNull DBSAttributeBase[] attributes, @Nullable DBDDataReceiver keysReceiver, boolean skipSequences)
{
this.statement = statement;
this.attributes = attributes;
this.keysReceiver = keysReceiver;
this.skipSequences = skipSequences;
}
@Override
public void add(@NotNull Object[] attributeValues) throws DBCException
{
if (!ArrayUtils.isEmpty(attributes) && ArrayUtils.isEmpty(attributeValues)) {
throw new DBCException("Bad attribute values: " + Arrays.toString(attributeValues));
}
values.add(attributeValues);
}
@NotNull
@Override
public DBCStatistics execute() throws DBCException
{
if (statement == null) {
throw new DBCException("Execute batch closed");
}
DBDValueHandler[] handlers = new DBDValueHandler[attributes.length];
for (int i = 0; i < attributes.length; i++) {
handlers[i] = DBUtils.findValueHandler(statement.getSession(), attributes[i]);
}
boolean useBatch = statement.getSession().getDataSource().getInfo().supportsBatchUpdates();
if (values.size() <= 1) {
useBatch = false;
}
DBCStatistics statistics = new DBCStatistics();
statistics.setQueryText(statement.getQueryString());
for (Object[] rowValues : values) {
int paramIndex = 0;
for (int k = 0; k < handlers.length; k++) {
DBDValueHandler handler = handlers[k];
DBSAttributeBase attribute = attributes[k];
if (skipSequences && (attribute.isPseudoAttribute() || attribute.isAutoGenerated())) {
continue;
}
handler.bindValueObject(statement.getSession(), statement, attribute, paramIndex++, rowValues[k]);
}
if (useBatch) {
statement.addToBatch();
} else {
// Execute each row separately
long startTime = System.currentTimeMillis();
statement.executeStatement();
statistics.addExecuteTime(System.currentTimeMillis() - startTime);
long rowCount = statement.getUpdateRowCount();
if (rowCount > 0) {
statistics.addRowsUpdated(rowCount);
}
// Read keys
if (keysReceiver != null) {
readKeys(statement.getSession(), statement, keysReceiver);
}
}
}
values.clear();
if (useBatch) {
// Process batch
long startTime = System.currentTimeMillis();
int[] updatedRows = statement.executeStatementBatch();
statistics.addExecuteTime(System.currentTimeMillis() - startTime);
if (!ArrayUtils.isEmpty(updatedRows)) {
for (int rows : updatedRows) {
statistics.addRowsUpdated(rows);
}
}
}
return statistics;
}
@Override
public void close()
{
statement.close();
statement = null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册