提交 096fb9fe 编写于 作者: V Vlad Ilyushchenko

fix(griffin): thorough testing of "asof" join and fixes for edge cases.

上级 03d2d2dd
......@@ -81,9 +81,10 @@ public class AsOfJoinNoKeyRecordCursorFactory extends AbstractRecordCursorFactor
private RecordCursor masterCursor;
private RecordCursor slaveCursor;
private Record masterRecord;
private Record slaveRecord;
private Record slaveRecB;
private Record slaveRecA;
private long slaveTimestamp = Long.MIN_VALUE;
private long lastSlaveRowID = Long.MIN_VALUE;
private long latestSlaveRowID = Long.MIN_VALUE;
public AsOfLightJoinRecordCursor(
int columnSplit,
......@@ -116,46 +117,81 @@ public class AsOfJoinNoKeyRecordCursorFactory extends AbstractRecordCursorFactor
return slaveCursor.getSymbolTable(columnIndex - columnSplit);
}
@Override
public boolean hasNext() {
final long prevSlaveRowID = lastSlaveRowID;
long lastRowId = lastSlaveRowID;
if (masterCursor.hasNext()) {
// great, we have a record no matter what
final long masterTimestamp = masterRecord.getTimestamp(masterTimestampIndex);
long slaveTimestamp = this.slaveTimestamp;
if (slaveTimestamp <= masterTimestamp) {
final Record rec = slaveCursor.getRecord();
while (slaveCursor.hasNext()) {
slaveTimestamp = rec.getTimestamp(slaveTimestampIndex);
if (slaveTimestamp > masterTimestamp) {
break;
}
lastRowId = rec.getRowId();
}
// now we have dangling slave record, which we need to hold on to
if (masterTimestamp < slaveTimestamp) {
return true;
}
nextSlave(masterTimestamp);
return true;
}
return false;
}
private void nextSlave(long masterTimestamp) {
if (slaveCursor.hasNext()) {
// check where this record falls
long slaveTimestamp = slaveRecA.getTimestamp(slaveTimestampIndex);
if (slaveTimestamp > masterTimestamp) {
positionSlaveRecB();
latestSlaveRowID = slaveRecA.getRowId();
this.slaveTimestamp = slaveTimestamp;
this.lastSlaveRowID = lastRowId;
} else {
overScrollSlave(masterTimestamp, slaveTimestamp);
}
} else {
slaveIsDone();
}
}
private void positionSlaveRecB() {
if (this.latestSlaveRowID != Long.MIN_VALUE) {
record.hasSlave(true);
slaveCursor.recordAt(slaveRecB, latestSlaveRowID);
}
}
private void slaveIsDone() {
positionSlaveRecB();
this.slaveTimestamp = Long.MAX_VALUE;
}
if (lastRowId == Long.MIN_VALUE) {
record.hasSlave(false);
private void overScrollSlave(long masterTimestamp, long slaveTimestamp) {
latestSlaveRowID = slaveRecA.getRowId();
this.slaveTimestamp = slaveTimestamp;
// scroll slave down
while (true) {
if (slaveCursor.hasNext()) {
slaveTimestamp = slaveRecA.getTimestamp(slaveTimestampIndex);
if (slaveTimestamp > masterTimestamp) {
record.hasSlave(true);
slaveCursor.recordAt(slaveRecB, latestSlaveRowID);
latestSlaveRowID = slaveRecA.getRowId();
this.slaveTimestamp = slaveTimestamp;
break;
} else {
latestSlaveRowID = slaveRecA.getRowId();
this.slaveTimestamp = slaveTimestamp;
}
} else {
record.hasSlave(true);
// positioning
if (prevSlaveRowID != lastSlaveRowID) {
slaveCursor.recordAt(slaveRecord, lastSlaveRowID);
}
slaveCursor.recordAt(slaveRecB, latestSlaveRowID);
this.slaveTimestamp = Long.MAX_VALUE;
break;
}
return true;
}
return false;
}
@Override
public void toTop() {
slaveTimestamp = Long.MIN_VALUE;
lastSlaveRowID = Long.MIN_VALUE;
latestSlaveRowID = Long.MIN_VALUE;
record.hasSlave(false);
masterCursor.toTop();
slaveCursor.toTop();
}
......@@ -167,12 +203,14 @@ public class AsOfJoinNoKeyRecordCursorFactory extends AbstractRecordCursorFactor
private void of(RecordCursor masterCursor, RecordCursor slaveCursor) {
slaveTimestamp = Long.MIN_VALUE;
lastSlaveRowID = Long.MIN_VALUE;
latestSlaveRowID = Long.MIN_VALUE;
this.masterCursor = masterCursor;
this.slaveCursor = slaveCursor;
this.masterRecord = masterCursor.getRecord();
this.slaveRecord = slaveCursor.getRecordB();
record.of(masterRecord, slaveRecord);
this.slaveRecA = slaveCursor.getRecord();
this.slaveRecB = slaveCursor.getRecordB();
record.of(masterRecord, slaveRecB);
record.hasSlave(false);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册