未验证 提交 55ee0a0e 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

fix(cairo): create table race condition fix (#787)

上级 a6419407
......@@ -236,6 +236,10 @@ JNIEXPORT jint JNICALL Java_io_questdb_std_Files_msync(JNIEnv *e, jclass cl, jlo
return msync((void *) addr, len, async ? MS_ASYNC : MS_SYNC);
}
JNIEXPORT jint JNICALL Java_io_questdb_std_Files_fsync(JNIEnv *e, jclass cl, jlong fd) {
return fsync((int) fd);
}
JNIEXPORT jboolean JNICALL Java_io_questdb_std_Files_remove
(JNIEnv *e, jclass cl, jlong lpsz) {
return (jboolean) (remove((const char *) lpsz) == 0);
......
......@@ -147,6 +147,13 @@ JNIEXPORT jint JNICALL Java_io_questdb_std_Files_msync(JNIEnv *e, jclass cl, jlo
return 0;
}
JNIEXPORT jint JNICALL Java_io_questdb_std_Files_fsync(JNIEnv *e, jclass cl, jlong fd) {
// Windows does not seem to have fsync or cannot fsync directory.
// To be fair we never saw our destructive test fail on windows,
// which leads to an assumption that all directory changes on windows are synchronous.
return -1;
}
JNIEXPORT jboolean JNICALL Java_io_questdb_std_Files_setLastModified
(JNIEnv *e, jclass cl, jlong lpszName, jlong millis) {
......
......@@ -105,7 +105,7 @@ public class CairoEngine implements Closeable, WriterSource {
configuration.getFilesFacade().close(tableIndexFd);
}
public void createTable(
public void createTableUnsafe(
CairoSecurityContext securityContext,
AppendMemory mem,
Path path,
......@@ -123,6 +123,28 @@ public class CairoEngine implements Closeable, WriterSource {
);
}
public void createTable(
CairoSecurityContext securityContext,
AppendMemory mem,
Path path,
TableStructure struct
) {
if (lock(securityContext, struct.getTableName())) {
try {
createTableUnsafe(
securityContext,
mem,
path,
struct
);
} finally {
unlock(securityContext, struct.getTableName(), null);
}
} else {
throw EntryUnavailableException.INSTANCE;
}
}
public TableWriter getBackupWriter(
CairoSecurityContext securityContext,
CharSequence tableName,
......
......@@ -130,6 +130,7 @@ public final class TableUtils {
int tableVersion,
int tableId
) {
LOG.debug().$("create table [name=").$(structure.getTableName()).$(']').$();
path.of(root).concat(structure.getTableName());
if (ff.mkdirs(path.put(Files.SEPARATOR).$(), mkDirMode) != 0) {
......@@ -138,52 +139,67 @@ public final class TableUtils {
final int rootLen = path.length();
try (AppendMemory mem = memory) {
mem.of(ff, path.trimTo(rootLen).concat(META_FILE_NAME).$(), ff.getPageSize());
final int count = structure.getColumnCount();
mem.putInt(count);
mem.putInt(structure.getPartitionBy());
mem.putInt(structure.getTimestampIndex());
mem.putInt(tableVersion);
mem.putInt(tableId);
mem.jumpTo(TableUtils.META_OFFSET_COLUMN_TYPES);
for (int i = 0; i < count; i++) {
mem.putByte((byte) structure.getColumnType(i));
long flags = 0;
if (structure.isIndexed(i)) {
flags |= META_FLAG_BIT_INDEXED;
}
final long dirFd = !ff.isRestrictedFileSystem() ? ff.openRO(path.$()) : 0;
if (dirFd != -1) {
try (AppendMemory mem = memory) {
mem.of(ff, path.trimTo(rootLen).concat(META_FILE_NAME).$(), ff.getPageSize());
final int count = structure.getColumnCount();
mem.putInt(count);
mem.putInt(structure.getPartitionBy());
mem.putInt(structure.getTimestampIndex());
mem.putInt(tableVersion);
mem.putInt(tableId);
mem.jumpTo(TableUtils.META_OFFSET_COLUMN_TYPES);
for (int i = 0; i < count; i++) {
mem.putByte((byte) structure.getColumnType(i));
long flags = 0;
if (structure.isIndexed(i)) {
flags |= META_FLAG_BIT_INDEXED;
}
if (structure.isSequential(i)) {
flags |= META_FLAG_BIT_SEQUENTIAL;
}
if (structure.isSequential(i)) {
flags |= META_FLAG_BIT_SEQUENTIAL;
}
mem.putLong(flags);
mem.putInt(structure.getIndexBlockCapacity(i));
mem.skip(META_COLUMN_DATA_RESERVED); // reserved
}
for (int i = 0; i < count; i++) {
mem.putStr(structure.getColumnName(i));
}
mem.putLong(flags);
mem.putInt(structure.getIndexBlockCapacity(i));
mem.skip(META_COLUMN_DATA_RESERVED); // reserved
}
for (int i = 0; i < count; i++) {
mem.putStr(structure.getColumnName(i));
}
// create symbol maps
int symbolMapCount = 0;
for (int i = 0; i < count; i++) {
if (structure.getColumnType(i) == ColumnType.SYMBOL) {
SymbolMapWriter.createSymbolMapFiles(
ff,
mem,
path.trimTo(rootLen),
structure.getColumnName(i),
structure.getSymbolCapacity(i),
structure.getSymbolCacheFlag(i)
);
symbolMapCount++;
// create symbol maps
int symbolMapCount = 0;
for (int i = 0; i < count; i++) {
if (structure.getColumnType(i) == ColumnType.SYMBOL) {
SymbolMapWriter.createSymbolMapFiles(
ff,
mem,
path.trimTo(rootLen),
structure.getColumnName(i),
structure.getSymbolCapacity(i),
structure.getSymbolCacheFlag(i)
);
symbolMapCount++;
}
}
mem.of(ff, path.trimTo(rootLen).concat(TXN_FILE_NAME).$(), ff.getPageSize());
TableUtils.resetTxn(mem, symbolMapCount, 0L, INITIAL_TXN);
} finally {
if (dirFd > 0) {
if (ff.fsync(dirFd) != 0) {
LOG.error()
.$("could not fsync [fd=").$(dirFd)
.$(", errno=").$(ff.errno())
.$(']').$();
}
ff.close(dirFd);
}
}
mem.of(ff, path.trimTo(rootLen).concat(TXN_FILE_NAME).$(), ff.getPageSize());
TableUtils.resetTxn(mem, symbolMapCount, 0L, INITIAL_TXN);
} else {
throw CairoException.instance(ff.errno()).put("Could not open dir [path=").put(path).put(']');
}
}
......
......@@ -781,11 +781,12 @@ class LineTcpMeasurementScheduler implements Closeable {
}
preprocessEvent(event);
engine.createTable(
engine.createTableUnsafe(
securityContext,
appendMemory,
path,
tableStructureAdapter.of(event, this));
tableStructureAdapter.of(event, this)
);
int nValues = event.getNValues();
for (int n = 0; n < nValues; n++) {
colIndexMappings.set(n, n);
......
......@@ -208,20 +208,12 @@ public class CairoTextWriter implements Closeable, Mutable {
ObjList<TypeAdapter> detectedTypes,
CairoSecurityContext cairoSecurityContext
) throws TextException {
if (engine.lock(cairoSecurityContext, tableName)) {
try {
engine.createTable(
cairoSecurityContext,
appendMemory,
path,
tableStructureAdapter.of(names, detectedTypes)
);
} finally {
engine.unlock(cairoSecurityContext, tableName, null);
}
} else {
throw EntryUnavailableException.INSTANCE;
}
engine.createTable(
cairoSecurityContext,
appendMemory,
path,
tableStructureAdapter.of(names, detectedTypes)
);
this.types = detectedTypes;
}
......
......@@ -1405,7 +1405,7 @@ public class SqlCompiler implements Closeable {
try {
if (createTableModel.getQueryModel() == null) {
engine.createTable(executionContext.getCairoSecurityContext(), mem, path, createTableModel);
engine.createTableUnsafe(executionContext.getCairoSecurityContext(), mem, path, createTableModel);
} else {
writer = createTableFromCursor(createTableModel, executionContext);
}
......@@ -1430,7 +1430,7 @@ public class SqlCompiler implements Closeable {
typeCast.clear();
final RecordMetadata metadata = factory.getMetadata();
validateTableModelAndCreateTypeCast(model, metadata, typeCast);
engine.createTable(
engine.createTableUnsafe(
executionContext.getCairoSecurityContext(),
mem,
path,
......
......@@ -36,15 +36,7 @@ public final class Files {
public static final Charset UTF_8;
public static final long PAGE_SIZE;
// public static final int DT_UNKNOWN = 0;
// public static final int DT_FIFO = 1;
// public static final int DT_CHR = 2;
public static final int DT_DIR = 4;
// public static final int DT_BLK = 6;
// public static final int DT_REG = 8;
// public static final int DT_LNK = 10;
// public static final int DT_SOCK = 12;
// public static final int DT_WHT = 14;
public static final int MAP_RO = 1;
public static final int MAP_RW = 2;
......@@ -116,6 +108,8 @@ public final class Files {
public static native int msync(long addr, long len, boolean async);
public static native int fsync(long fd);
public static int mkdir(LPSZ path, int mode) {
return mkdir(path.address(), mode);
}
......
......@@ -54,6 +54,8 @@ public interface FilesFacade {
int msync(long addr, long len, boolean async);
int fsync(long fd);
long getMapPageSize();
long getOpenFileCount();
......
......@@ -102,6 +102,11 @@ public class FilesFacadeImpl implements FilesFacade {
return Files.msync(addr, len, async);
}
@Override
public int fsync(long fd) {
return Files.fsync(fd);
}
@Override
public long getMapPageSize() {
if (mapPageSize == 0) {
......
......@@ -2954,7 +2954,7 @@
"buffer-from": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz",
"integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==",
"integrity": "sha1-MnE7wCj3XAL9txDXx7zsHyxgcO8=",
"dev": true
},
"buffer-indexof": {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册