提交 728f1e8e 编写于 作者: N nishantmonu51

fix exists check with compression

上级 01e84f10
......@@ -128,7 +128,7 @@ public class DetermineHashedPartitionsJob implements Jobby
if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!fileSystem.exists(intervalInfoPath)) {
if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) {
throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
}
List<Interval> intervals = config.jsonMapper.readValue(
......@@ -148,7 +148,7 @@ public class DetermineHashedPartitionsJob implements Jobby
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) {
Long cardinality = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{
......
......@@ -215,7 +215,7 @@ public class DeterminePartitionsJob implements Jobby
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{
......
......@@ -65,7 +65,8 @@ public class Utils
return retVal;
}
public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting) throws IOException
public static OutputStream makePathAndOutputStream(JobContext job, Path outputPath, boolean deleteExisting)
throws IOException
{
OutputStream retVal;
FileSystem fs = outputPath.getFileSystem(job.getConfiguration());
......@@ -73,8 +74,7 @@ public class Utils
if (fs.exists(outputPath)) {
if (deleteExisting) {
fs.delete(outputPath, false);
}
else {
} else {
throw new ISE("outputPath[%s] must not exist.", outputPath);
}
}
......@@ -97,6 +97,17 @@ public class Utils
return openInputStream(inputPath, inputPath.getFileSystem(job.getConfiguration()));
}
public static boolean exists(JobContext job, FileSystem fs, Path inputPath) throws IOException
{
if (!FileOutputFormat.getCompressOutput(job)) {
return fs.exists(inputPath);
} else {
Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job.getConfiguration());
return fs.exists(new Path(inputPath.toString() + codec.getDefaultExtension()));
}
}
public static InputStream openInputStream(Path inputPath, final FileSystem fileSystem) throws IOException
{
return fileSystem.open(inputPath);
......@@ -109,7 +120,9 @@ public class Utils
return jsonMapper.readValue(
fs.open(statsPath),
new TypeReference<Map<String, Object>>(){}
new TypeReference<Map<String, Object>>()
{
}
);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册