提交 9621b694 编写于 作者: V Vlad Ilyushchenko

Misc: fix for get/set lastModified on Windows. Clock change highlighted...

Misc: fix for get/set lastModified on Windows. Clock change highlighted inconsistent behaviour, which I fixed in this commit.
CUTLASS: fixed TextImportProcessor resume function + test
Misc: Files.exists() does not use Files.getLastModified() anymore
上级 95b6c454
......@@ -78,5 +78,5 @@ include(${ZLIB_SOURCE_DIR}/CMakeLists.txt)
add_library(questdb SHARED ${SOURCE_FILES} ${ZLIB_SRCS})
if (WIN32)
target_link_libraries(questdb wsock32 ws2_32 secur32)
target_link_libraries(questdb wsock32 ws2_32 secur32 shlwapi)
endif (WIN32)
......@@ -23,12 +23,13 @@
#define _WIN32_WINNT 0x600 /* GetFileInformationByHandleEx is Vista+ */
#include <shlwapi.h>
#include <unistd.h>
#include <sys/stat.h>
#include <minwindef.h>
#include <fileapi.h>
typedef HANDLE HWND;
//typedef HANDLE HWND;
#include <winbase.h>
#include <direct.h>
......@@ -76,6 +77,8 @@ JNIEXPORT jlong JNICALL Java_com_questdb_std_Files_read
return 0;
}
#define MILLIS_SINCE_1970 11644473600000
JNIEXPORT jlong JNICALL Java_com_questdb_std_Files_sequentialRead
(JNIEnv *e, jclass cl, jlong fd, jlong address, jint len) {
DWORD count;
......@@ -88,32 +91,40 @@ JNIEXPORT jlong JNICALL Java_com_questdb_std_Files_sequentialRead
}
JNIEXPORT jlong JNICALL Java_com_questdb_std_Files_getLastModified
(JNIEnv *e, jclass cl, jlong pchar) {
TIME_ZONE_INFORMATION tz;
LONG bias;
switch (GetTimeZoneInformation(&tz)) {
case TIME_ZONE_ID_STANDARD:
bias = tz.StandardBias;
break;
case TIME_ZONE_ID_DAYLIGHT:
bias = tz.DaylightBias;
break;
default:
bias = 0;
}
if (bias != 0) {
bias *= 60000L;
}
struct stat st;
int r = stat((const char *) pchar, &st);
if (r == 0) {
return (1000 * (jlong) st.st_mtime) + bias;
(JNIEnv *e, jclass cl, jlong lpszName) {
HANDLE handle = CreateFile(
(LPCSTR) lpszName,
FILE_WRITE_ATTRIBUTES,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL
);
if (handle == INVALID_HANDLE_VALUE) {
SaveLastError();
return -1;
}
FILETIME creation;
FILETIME access;
FILETIME write;
if (GetFileTime(handle, &creation, &access, &write)) {
CloseHandle(handle);
return ((((jlong)write.dwHighDateTime) << 32) | write.dwLowDateTime) / 10000 - MILLIS_SINCE_1970;
}
CloseHandle(handle);
SaveLastError();
return r;
return -1;
}
JNIEXPORT jboolean JNICALL
Java_com_questdb_std_Files_exists0
(JNIEnv *e, jclass cl, jlong lpszName) {
return PathFileExistsA((LPCSTR) lpszName);
}
JNIEXPORT jboolean JNICALL Java_com_questdb_std_Files_setLastModified
......@@ -133,24 +144,7 @@ JNIEXPORT jboolean JNICALL Java_com_questdb_std_Files_setLastModified
return 0;
}
LONG bias;
TIME_ZONE_INFORMATION tz;
switch (GetTimeZoneInformation(&tz)) {
case TIME_ZONE_ID_STANDARD:
bias = tz.StandardBias;
break;
case TIME_ZONE_ID_DAYLIGHT:
bias = tz.DaylightBias;
break;
default:
bias = 0;
}
if (bias != 0) {
bias *= 60000L;
}
millis += bias;
millis += 11644477200000;
millis += MILLIS_SINCE_1970; // millis between 1601-01-01 and 1970-01-01
millis *= 10000;
FILETIME t;
t.dwHighDateTime = (DWORD) ((millis >> 32) & 0xFFFFFFFF);
......
......@@ -71,6 +71,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
@Override
public void clear() {
LOG.debug().$("clear").$();
this.headerParser.clear();
this.multipartContentParser.clear();
this.multipartContentParser.clear();
......@@ -205,6 +206,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
return;
}
// dump(recvBuffer, read);
headerEnd = headerParser.parse(recvBuffer, recvBuffer + read, true);
}
} else {
......@@ -266,8 +269,9 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
break;
}
if (n == 0) {
// dump(buf, n);
if (n == 0) {
// Text loader needs as big of a data chunk as possible
// to analyse columns and delimiters correctly. To make sure we
// can deliver large data chunk we have to implement mini-Nagle
......@@ -291,6 +295,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
continue;
}
LOG.debug().$("peer is slow [multipart]").$();
dispatcher.registerChannel(this, IOOperation.READ);
break;
......
......@@ -37,8 +37,8 @@ import java.io.Closeable;
public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
private static final Log LOG = LogFactory.getLog(StaticContentProcessor.class);
private static final LocalValue<StaticContentProcessorState> LV = new LocalValue<>();
private final MimeTypesCache mimeTypes;
private final LocalValue<StaticContentProcessorState> stateAccessor = new LocalValue<>();
private final HttpRangeParser rangeParser = new HttpRangeParser();
private final PrefixedPath prefixedPath;
private final CharSequence indexFileName;
......@@ -94,7 +94,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
@Override
public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("resumeSend").$();
StaticContentProcessorState state = stateAccessor.get(context);
StaticContentProcessorState state = LV.get(context);
if (state == null || state.fd == -1) {
return;
......@@ -171,9 +171,9 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
boolean asAttachment) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (rangeParser.of(range)) {
StaticContentProcessorState state = stateAccessor.get(context);
StaticContentProcessorState state = LV.get(context);
if (state == null) {
stateAccessor.set(context, state = new StaticContentProcessorState());
LV.set(context, state = new StaticContentProcessorState());
}
state.fd = ff.openRO(path);
......@@ -226,9 +226,9 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
LOG.info().$("Cannot open file: ").$(path).$('(').$(ff.errno()).$(')').$();
sendStatusWithDefaultMessage(context, dispatcher, 404);
} else {
StaticContentProcessorState h = stateAccessor.get(context);
StaticContentProcessorState h = LV.get(context);
if (h == null) {
stateAccessor.set(context, h = new StaticContentProcessorState());
LV.set(context, h = new StaticContentProcessorState());
}
h.fd = fd;
h.bytesSent = 0;
......
......@@ -57,7 +57,16 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
private static final CharSequence CONTENT_TYPE_TEXT = "text/plain; charset=utf-8";
private static final CharSequence CONTENT_TYPE_JSON = "application/json; charset=utf-8";
private static final CharSequenceIntHashMap atomicityParamMap = new CharSequenceIntHashMap();
private final LocalValue<TextImportProcessorState> lvContext = new LocalValue<>();
// Local value has to be static because each thread will have its own instance of
// processor. For different threads to lookup the same value from local value map the key,
// which is LV, has to be the same between processor instances
private static final LocalValue<TextImportProcessorState> LV = new LocalValue<>();
static {
atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW);
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
private final TextImportProcessorConfiguration configuration;
private final CairoEngine engine;
private HttpConnectionContext transientContext;
......@@ -72,118 +81,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
this.engine = cairoEngine;
}
@Override
public void close() {
}
@Override
public void onChunk(HttpRequestHeader partHeader, long lo, long hi) {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, (int) (hi - lo));
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (JsonException e) {
// todo: reply something sensible
e.printStackTrace();
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext, DisconnectReason.SILLY);
return;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
// todo: these values are incorrect, but ok for now
getAtomicity(rh.getUrlParam("atomicity"))
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equals("schema", partHeader.getContentDispositionName())) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
// todo: disconnect
transientState.messagePart = MESSAGE_UNKNOWN;
}
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
@Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (JsonException e) {
handleJsonException(e);
}
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
transientState.clear();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = lvContext.get(context);
if (this.transientState == null) {
try {
lvContext.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine));
} catch (JsonException e) {
// todo: handle gracefully
e.printStackTrace();
}
}
}
@Override
public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(lvContext.get(context), context.getChunkedResponseSocket());
}
private static void resumeJson(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = state.textLoader;
final RecordMetadata m = textLoader.getMetadata();
......@@ -257,6 +154,12 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return b;
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
private static void pad(CharSink b, int w, long value) {
int len = (int) Math.log10(value);
if (len < 0) {
......@@ -355,6 +258,113 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return atomicity == -1 ? Atomicity.SKIP_COL : atomicity;
}
@Override
public void close() {
}
@Override
public void onChunk(HttpRequestHeader partHeader, long lo, long hi) {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, (int) (hi - lo));
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (JsonException e) {
// todo: reply something sensible
e.printStackTrace();
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext, DisconnectReason.SILLY);
return;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
// todo: these values are incorrect, but ok for now
getAtomicity(rh.getUrlParam("atomicity"))
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equals("schema", partHeader.getContentDispositionName())) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
// todo: disconnect
transientState.messagePart = MESSAGE_UNKNOWN;
}
}
@Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (JsonException e) {
handleJsonException(e);
}
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
transientState.clear();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
if (this.transientState == null) {
try {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine));
} catch (JsonException e) {
// todo: handle gracefully
e.printStackTrace();
}
}
}
@Override
public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private void doResumeSend(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl response) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
......@@ -403,7 +413,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
}
private void sendResponse(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
TextImportProcessorState state = lvContext.get(context);
TextImportProcessorState state = LV.get(context);
// todo: may be set this up when headers are ready?
state.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt"));
HttpResponseSink.ChunkedResponseImpl response = context.getChunkedResponseSocket();
......@@ -420,9 +430,4 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
sendError(context, state.stateMessage, state.json);
}
}
static {
atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW);
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
}
......@@ -200,6 +200,7 @@ public class TextLoader implements Closeable, Mutable {
}
public void setState(int state) {
LOG.debug().$("state change [old=").$(this.state).$(", new=").$(state).$(']').$();
this.state = state;
jsonLexer.clear();
}
......
......@@ -40,7 +40,7 @@ public final class Files {
public static final int DT_DIR = 4;
// public static final int DT_BLK = 6;
// public static final int DT_REG = 8;
public static final int DT_LNK = 10;
// public static final int DT_LNK = 10;
// public static final int DT_SOCK = 12;
// public static final int DT_WHT = 14;
......@@ -83,9 +83,11 @@ public final class Files {
public static native boolean exists(long fd);
public static boolean exists(LPSZ lpsz) {
return lpsz != null && getLastModified(lpsz) != -1;
return lpsz != null && exists0(lpsz.address());
}
private static native boolean exists0(long lpsz);
public native static void findClose(long findPtr);
public static long findFirst(LPSZ lpsz) {
......
......@@ -24,7 +24,6 @@
package com.questdb.std;
import java.io.Closeable;
import java.lang.ref.WeakReference;
public class LocalValueMap implements Closeable, Mutable {
......@@ -265,13 +264,12 @@ public class LocalValueMap implements Closeable, Mutable {
threshold = len * 2 / 3;
}
static class Entry extends WeakReference<LocalValue<?>> {
static class Entry {
Object value;
LocalValue<?> k;
Entry(LocalValue<?> k, Object v) {
super(k);
value = v;
this.k = k;
}
......
......@@ -131,14 +131,18 @@ public class FilesTest {
@Test
public void testLastModified() throws IOException, NumericException {
try (Path path = new Path()) {
File f = temporaryFolder.newFile();
Assert.assertTrue(Files.touch(path.of(f.getAbsolutePath()).$()));
long t = DateFormatUtils.parseDateTime("2015-10-17T10:00:00.000Z");
Assert.assertTrue(Files.setLastModified(path, t));
Assert.assertEquals(t, Files.getLastModified(path));
assertLastModified(path, DateFormatUtils.parseDateTime("2015-10-17T10:00:00.000Z"));
assertLastModified(path, 122222212222L);
}
}
private void assertLastModified(Path path, long t) throws IOException {
File f = temporaryFolder.newFile();
Assert.assertTrue(Files.touch(path.of(f.getAbsolutePath()).$()));
Assert.assertTrue(Files.setLastModified(path, t));
Assert.assertEquals(t, Files.getLastModified(path));
}
@Test
public void testListDir() {
String temp = temporaryFolder.getRoot().getAbsolutePath();
......
......@@ -561,7 +561,6 @@ public class IODispatcherTest {
}
@Test
@Ignore
public void testImportMultipleOnSameConnectionSlow() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
......@@ -686,7 +685,6 @@ public class IODispatcherTest {
long ptr = Unsafe.malloc(((CharSequence) request).length());
try {
for (int j = 0; j < 5; j++) {
System.out.println(j);
int sent = 0;
Chars.strcpy(request, ((CharSequence) request).length(), ptr);
while (sent < len) {
......@@ -874,7 +872,7 @@ public class IODispatcherTest {
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Content-Length: 20971520\r\n" +
"Content-Type: text/plain\r\n" +
"ETag: \"122222212000\"\r\n" + // this is last modified timestamp on the file, we set this value when we created file
"ETag: \"122222212222\"\r\n" + // this is last modified timestamp on the file, we set this value when we created file
"\r\n";
for (int j = 0; j < 10; j++) {
......@@ -898,7 +896,7 @@ public class IODispatcherTest {
"User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36\r\n" +
"Accept-Encoding: gzip,deflate,sdch\r\n" +
"Accept-Language: en-US,en;q=0.8\r\n" +
"If-None-Match: \"122222212000\"\r\n" + // this header should make static processor return 304
"If-None-Match: \"122222212222\"\r\n" + // this header should make static processor return 304
"Cookie: textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
......@@ -1058,7 +1056,7 @@ public class IODispatcherTest {
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Content-Length: 20971520\r\n" +
"Content-Type: text/plain\r\n" +
"ETag: \"122222212000\"\r\n" + // this is last modified timestamp on the file, we set this value when we created file
"ETag: \"122222212222\"\r\n" + // this is last modified timestamp on the file, we set this value when we created file
"\r\n";
for (int j = 0; j < 10; j++) {
......@@ -1075,7 +1073,7 @@ public class IODispatcherTest {
"User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36\r\n" +
"Accept-Encoding: gzip,deflate,sdch\r\n" +
"Accept-Language: en-US,en;q=0.8\r\n" +
"If-None-Match: \"122222212000\"\r\n" + // this header should make static processor return 304
"If-None-Match: \"122222212222\"\r\n" + // this header should make static processor return 304
"Cookie: textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册