提交 410ce5c9 编写于 作者: V Vlad Ilyushchenko

CUTLASS: basic support of TIMESTAMP column import

上级 38207677
......@@ -60,6 +60,7 @@ public final class Kqueue implements Closeable {
if (Net.close(this.kq) < 0) {
LOG.error().$("Cannot close kqueue ").$(this.kq).$();
}
Unsafe.free(this.eventList, SIZEOF_KEVENT * (long) capacity);
}
public long getData() {
......
......@@ -55,4 +55,8 @@ public interface NetworkFacade {
long socketTcp(boolean blocking);
long socketUdp();
boolean bindUdp(long fd, CharSequence address, int port);
boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address);
}
......@@ -105,4 +105,14 @@ public class NetworkFacadeImpl implements NetworkFacade {
public long socketUdp() {
return Net.socketUdp();
}
@Override
public boolean bindUdp(long fd, CharSequence address, int port) {
return Net.bindUdp(fd, address, port);
}
@Override
public boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address) {
return Net.join(fd, bindIPv4Address, bindIPv4Address);
}
}
......@@ -60,6 +60,7 @@ public final class Kqueue implements Closeable {
if (Net.close(this.kq) < 0) {
LOG.error().$("Cannot close kqueue ").$(this.kq).$();
}
Unsafe.free(this.eventList, SIZEOF_KEVENT * (long) capacity);
}
public long getData() {
......
......@@ -16,7 +16,7 @@
],
"timestamp": [
{
"format": "yyyy-MM-ddTHH:mm:ss.SSSz"
"format": "yyyy-MM-ddTHH:mm:ss.SSSUUUz"
}
]
}
\ No newline at end of file
......@@ -947,6 +947,44 @@ public class PlainTextImportTest extends AbstractCairoTest {
});
}
@Test
public void testImportTimestamp() throws Exception {
assertNoLeak(new DefaultTextConfiguration() {
@Override
public int getTextAnalysisMaxLines() {
return 3;
}
},
textLoader -> {
String expected = "StrSym\tts\n" +
"CMP1\t2015-01-13T19:15:09.000000Z\n" +
"CMP2\t2015-01-13T19:15:09.000234Z\n" +
"CMP1\t2015-01-13T19:15:09.000455Z\n" +
"CMP2\t2015-01-13T19:15:09.000754Z\n" +
"CMP1\t2015-01-13T19:15:09.000903Z\n";
String csv = "StrSym,ts\n" +
"CMP1,2015-01-13T19:15:09.000000Z\n" +
"CMP2,2015-01-13T19:15:09.000234Z\n" +
"CMP1,2015-01-13T19:15:09.000455Z\n" +
"CMP2,2015-01-13T19:15:09.000754Z\n" +
"CMP1,2015-01-13T19:15:09.000903Z\n";
configureLoaderDefaults(textLoader, (byte) ',');
textLoader.setForceHeaders(false);
playText(
textLoader,
csv,
1024,
expected,
"{\"columnCount\":2,\"columns\":[{\"index\":0,\"name\":\"StrSym\",\"type\":\"STRING\"},{\"index\":1,\"name\":\"ts\",\"type\":\"TIMESTAMP\"}],\"timestampIndex\":-1}",
5,
5
);
});
}
@Test
public void testLineRoll() throws Exception {
assertNoLeak(textLoader -> {
......
......@@ -81,7 +81,7 @@ public class ConcurrentTest {
CyclicBarrier barrier = new CyclicBarrier(3);
CountDownLatch latch = new CountDownLatch(2);
BusyConsumer consumers[] = new BusyConsumer[2];
BusyConsumer[] consumers = new BusyConsumer[2];
consumers[0] = new BusyConsumer(size, subSeq, queue, barrier, latch);
consumers[1] = new BusyConsumer(size, subSeq, queue, barrier, latch);
......@@ -108,7 +108,7 @@ public class ConcurrentTest {
latch.await();
int buf[] = new int[size];
int[] buf = new int[size];
System.arraycopy(consumers[0].buf, 0, buf, 0, consumers[0].finalIndex);
System.arraycopy(consumers[1].buf, 0, buf, consumers[0].finalIndex, consumers[1].finalIndex);
Arrays.sort(buf);
......@@ -130,7 +130,7 @@ public class ConcurrentTest {
CyclicBarrier barrier = new CyclicBarrier(3);
CountDownLatch latch = new CountDownLatch(2);
WaitingConsumer consumers[] = new WaitingConsumer[2];
WaitingConsumer[] consumers = new WaitingConsumer[2];
consumers[0] = new WaitingConsumer(size, subSeq, queue, barrier, latch);
consumers[1] = new WaitingConsumer(size, subSeq, queue, barrier, latch);
......@@ -139,21 +139,18 @@ public class ConcurrentTest {
barrier.await();
int i = 0;
while (true) {
do {
long cursor = pubSeq.nextBully();
queue.get(cursor).value = i++;
pubSeq.done(cursor);
if (i == size) {
break;
}
}
} while (i != size);
publishEOE(queue, pubSeq);
publishEOE(queue, pubSeq);
latch.await();
int buf[] = new int[size];
int[] buf = new int[size];
System.arraycopy(consumers[0].buf, 0, buf, 0, consumers[0].finalIndex);
System.arraycopy(consumers[1].buf, 0, buf, consumers[0].finalIndex, consumers[1].finalIndex);
Arrays.sort(buf);
......@@ -196,7 +193,7 @@ public class ConcurrentTest {
latch.await();
int buf[] = consumer.buf;
int[] buf = consumer.buf;
for (i = 0; i < buf.length; i++) {
Assert.assertEquals(i, buf[i]);
}
......@@ -220,20 +217,17 @@ public class ConcurrentTest {
barrier.await();
int i = 0;
while (true) {
do {
long cursor = pubSeq.nextBully();
queue.get(cursor).value = i++;
pubSeq.done(cursor);
if (i == size) {
break;
}
}
} while (i != size);
publishEOE(queue, pubSeq);
latch.await();
int buf[] = consumer.buf;
int[] buf = consumer.buf;
for (i = 0; i < buf.length; i++) {
Assert.assertEquals(i, buf[i]);
}
......@@ -253,7 +247,7 @@ public class ConcurrentTest {
CyclicBarrier barrier = new CyclicBarrier(3);
CountDownLatch latch = new CountDownLatch(2);
BusyConsumer consumers[] = new BusyConsumer[2];
BusyConsumer[] consumers = new BusyConsumer[2];
consumers[0] = new BusyConsumer(size, sub1, queue, barrier, latch);
consumers[1] = new BusyConsumer(size, sub2, queue, barrier, latch);
......@@ -303,7 +297,7 @@ public class ConcurrentTest {
CyclicBarrier barrier = new CyclicBarrier(4);
CountDownLatch latch = new CountDownLatch(3);
BusyConsumer consumers[] = new BusyConsumer[2];
BusyConsumer[] consumers = new BusyConsumer[2];
consumers[0] = new BusyConsumer(size, sub1, queue, barrier, latch);
consumers[1] = new BusyConsumer(size, sub2, queue, barrier, latch);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册