提交 4bcf1a7b 编写于 作者: R roblou

Organize TextSearch

上级 25350183
......@@ -29,54 +29,34 @@ export class Engine implements ISearchEngine<ISerializedFileMatch> {
private static PROGRESS_FLUSH_CHUNK_SIZE = 50; // optimization: number of files to process before emitting progress event
private rootFolders: string[];
private extraFiles: string[];
private maxResults: number;
private config: IRawSearch;
private walker: FileWalker;
private isCanceled: boolean;
private isDone: boolean;
private total: number;
private worked: number;
private progressed: number;
private walkerError: Error;
private walkerIsDone: boolean;
private fileEncoding: string;
private limitReached: boolean;
private workers: ISearchWorker[] = [];
private readyWorkers: ISearchWorker[] = [];
private isCanceled = false;
private isDone = false;
private totalBytes = 0;
private processedBytes = 0;
private progressed = 0;
private walkerIsDone = false;
private limitReached = false;
private numResults = 0;
private fileEncoding: string;
private nextWorker = 0;
private batches = [];
private batchSizes = [];
private onResult: any;
private workers: ISearchWorker[] = [];
private workerPromises: TPromise<void>[] = [];
constructor(config: IRawSearch, walker: FileWalker) {
this.rootFolders = config.rootFolders;
this.extraFiles = config.extraFiles;
this.config = config;
this.walker = walker;
this.isCanceled = false;
this.limitReached = false;
this.maxResults = config.maxResults;
this.worked = 0;
this.progressed = 0;
this.total = 0;
this.fileEncoding = encodingExists(config.fileEncoding) ? config.fileEncoding : UTF8;
this.fileEncoding = encodingExists(config.fileEncoding) ? config.fileEncoding : UTF8; // todo
// Spin up workers
const workerConfig: ISearchWorkerConfig = {
pattern: config.contentPattern,
id: undefined
};
const numWorkers = Math.ceil(os.cpus().length/2); // /2 because of hyperthreading. Maybe make better.
// const numWorkers = 2;
for (let i = 0; i < numWorkers; i++) {
workerConfig.id = i;
const worker = createWorker(workerConfig);
const worker = createWorker(i, config.contentPattern);
this.workers.push(worker);
this.readyWorkers.push(worker);
}
}
......@@ -89,16 +69,15 @@ export class Engine implements ISearchEngine<ISerializedFileMatch> {
public search(onResult: (match: ISerializedFileMatch) => void, onProgress: (progress: IProgress) => void, done: (error: Error, complete: ISerializedSearchComplete) => void): void {
let resultCounter = 0;
this.onResult = onResult;
const progress = () => {
if (++this.progressed % Engine.PROGRESS_FLUSH_CHUNK_SIZE === 0) {
onProgress({ total: this.total, worked: this.worked }); // buffer progress in chunks to reduce pressure
onProgress({ total: this.totalBytes, worked: this.processedBytes }); // buffer progress in chunks to reduce pressure
}
};
const unwind = (processed: number, iAmDone?) => {
this.worked += processed;
const unwind = (processed: number) => {
this.processedBytes += processed;
// Emit progress() unless we got canceled or hit the limit
if (processed && !this.isDone && !this.isCanceled && !this.limitReached) {
......@@ -106,8 +85,8 @@ export class Engine implements ISearchEngine<ISerializedFileMatch> {
}
// Emit done()
// console.log('unwind: ' + this.worked + '/' + this.total);
if (iAmDone && !this.isDone && this.worked === this.total) {
console.log('unwind: ' + this.worked + '/' + this.total);
if (!this.isDone && this.processedBytes === this.totalBytes && this.walkerIsDone) {
this.isDone = true;
done(this.walkerError, {
limitHit: this.limitReached,
......@@ -117,82 +96,69 @@ export class Engine implements ISearchEngine<ISerializedFileMatch> {
};
let begin = 0;
const onBatchReady = (batch: string[], batchSize): void => {
// console.log(`onBatchReady: ${batchSize}, ${this.worked}/${this.total}`);
if (this.readyWorkers.length) {
const worker = begin < 4 ? this.readyWorkers[begin++] : this.readyWorkers.pop();
run(worker, batch, batchSize);
// this.nextWorker = (this.nextWorker + 1) % this.workers.length;
} else {
this.batches.push(batch);
this.batchSizes.push(batchSize);
}
};
const run = (worker, batch, batchSize) => {
worker.search({absolutePaths: batch, maxResults: 1e8 }).then(matches => {
// console.log('got result - ' + batchSize);
const run = (batch: string[], batchBytes: number): TPromise<void> => {
console.log(`onBatchReady: ${batchBytes}, ${this.processedBytes}/${this.totalBytes}`);
const worker = this.workers[this.nextWorker];
this.nextWorker = (this.nextWorker + 1) % this.workers.length;
const batchPromise = worker.search({absolutePaths: batch, maxResults: 1e8 }).then(matches => {
console.log('got result - ' + batchBytes);
this.numResults += matches.length;
matches.forEach(m => {
if (m && m.lineMatches.length) {
onResult(m);
}
});
unwind(batchSize);
if (this.batches.length) run(worker, this.batches.shift(), this.batchSizes.shift());
else if (this.walkerIsDone) unwind(0, true);
else {
this.readyWorkers.push(worker);
}
unwind(batchBytes);
});
this.workerPromises.push(batchPromise);
return batchPromise;
}
// Walk over the file system
const files = [];
let nextBatch = [];
let nextBatchSize = 0;
let workerBatchSize = 500;
this.walker.walk(this.rootFolders, this.extraFiles, result => {
let size = result.size || 1;
// this.total += size;
let nextBatchBytes = 0;
let batchFlushBytes = 5e6;
this.walker.walk(this.config.rootFolders, this.config.extraFiles, result => {
let bytes = result.size || 1;
// If the result is empty or we have reached the limit or we are canceled, ignore it
if (this.limitReached || this.isCanceled) {
return unwind(size);
return unwind(bytes);
}
// Indicate progress to the outside
progress();
const absolutePath = result.base ? [result.base, result.relativePath].join(path.sep) : result.relativePath;
// files.push(absolutePath);
nextBatch.push(absolutePath);
nextBatchSize += size;
if (nextBatch.length >= workerBatchSize) {
this.total += nextBatchSize;
onBatchReady(nextBatch, nextBatchSize);
nextBatchBytes += bytes;
this.totalBytes += bytes;
if (nextBatchBytes >= batchFlushBytes) {
run(nextBatch, nextBatchBytes);
nextBatch = [];
nextBatchSize = 0;
nextBatchBytes = 0;
}
}, (error, isLimitHit) => {
if (nextBatch.length) {
this.total += nextBatchSize;
onBatchReady(nextBatch, nextBatchSize);
run(nextBatch, nextBatchBytes);
}
this.walkerIsDone = true;
this.walkerError = error;
// unwind(0 /* walker is done, indicate this back to our handler to be able to unwind */);
});
}
}
function createWorker(config: ISearchWorkerConfig): ISearchWorker {
config = JSON.parse(JSON.stringify(config)); // copy
function createWorker(id: number, pattern: IPatternInfo): ISearchWorker {
let client = new Client(
uri.parse(require.toUrl('bootstrap')).fsPath,
{
serverName: 'Search Worker ' + config.id,
serverName: 'Search Worker ' + id,
timeout: 60 * 60 * 1000,
args: ['--type=searchWorker'],
env: {
......@@ -205,6 +171,7 @@ function createWorker(config: ISearchWorkerConfig): ISearchWorker {
// Make async?
const channel = ipc.getNextTickChannel(client.getChannel<ISearchWorkerChannel>('searchWorker'));
const channelClient = new SearchWorkerChannelClient(channel);
const config: ISearchWorkerConfig = { pattern, id }
channelClient.initialize(config);
return channelClient;
}
\ No newline at end of file
......@@ -48,10 +48,9 @@ export class SearchWorker implements ISearchWorker {
// console.log('worker started: ' + Date.now());
this.contentPattern = strings.createRegExp(config.pattern.pattern, config.pattern.isRegExp, { matchCase: config.pattern.isCaseSensitive, wholeWord: config.pattern.isWordMatch, multiline: false, global: true });
this.config = config;
console.log(config);
if (config.id === 0) {
console.log('startProfiling');
profiler.startProfiling('p1');
// console.log('startProfiling');
// profiler.startProfiling('p1');
}
return TPromise.wrap<void>(undefined);
......@@ -62,10 +61,10 @@ export class SearchWorker implements ISearchWorker {
}
search(args: ISearchWorkerSearchArgs): TPromise<FileMatch[]> {
// console.log('starting search: ' + Date.now() + ' ' + args.absolutePaths.length);
console.log('starting search: ' + Date.now() + ' ' + args.absolutePaths.length);
return this.nextSearch = this.nextSearch.then(() => {
// console.log('starting batch: ' + Date.now() + ' ' + args.absolutePaths.length);
console.log('starting batch: ' + Date.now() + ' ' + args.absolutePaths.length);
this.paths = args.absolutePaths;
this.running = true;
this.results = [];
......@@ -89,18 +88,18 @@ export class SearchWorker implements ISearchWorker {
this.start(this.paths.pop());
} else if (this.running) {
this.running = false;
console.log(`done`);
this.finalCallback(this.results.filter(r => !!r));
console.log(`done ${this.nResults} ${this.config.id}`)
if (this.nResults === 0 && this.config.id === 0) {
this.nResults++;
console.log('stopProfiling');
const profile = profiler.stopProfiling('p1');
profile.export(function(error, result) {
console.log(error);
fs.writeFileSync('p1.cpuprofile', result);
profile.delete();
});
// console.log('stopProfiling');
// const profile = profiler.stopProfiling('p1');
// profile.export(function(error, result) {
// console.log(error);
// fs.writeFileSync('p1.cpuprofile', result);
// profile.delete();
// });
}
}
})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册