WorkCoordinator.LowPriorityProcessor.cs 10.2 KB
Newer Older
S
Sam Harwell 已提交
1
// Copyright (c) Microsoft.  All Rights Reserved.  Licensed under the Apache License, Version 2.0.  See License.txt in the project root for license information.
2 3 4 5 6 7 8 9 10 11 12 13 14 15

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.CodeAnalysis.Internal.Log;
using Microsoft.CodeAnalysis.Notification;
using Microsoft.CodeAnalysis.Shared.TestHooks;
using Roslyn.Utilities;

namespace Microsoft.CodeAnalysis.SolutionCrawler
{
16
    internal sealed partial class SolutionCrawlerRegistrationService
17 18 19 20 21
    {
        private sealed partial class WorkCoordinator
        {
            private sealed partial class IncrementalAnalyzerProcessor
            {
22
                private sealed class LowPriorityProcessor : AbstractPriorityProcessor
23
                {
24
                    private readonly AsyncProjectWorkItemQueue _workItemQueue;
25 26 27 28 29 30 31 32

                    public LowPriorityProcessor(
                        IAsynchronousOperationListener listener,
                        IncrementalAnalyzerProcessor processor,
                        Lazy<ImmutableArray<IIncrementalAnalyzer>> lazyAnalyzers,
                        IGlobalOperationNotificationService globalOperationNotificationService,
                        int backOffTimeSpanInMs,
                        CancellationToken shutdownToken) :
33
                        base(listener, processor, lazyAnalyzers, globalOperationNotificationService, backOffTimeSpanInMs, shutdownToken)
34
                    {
35
                        _workItemQueue = new AsyncProjectWorkItemQueue(processor._registration.ProgressReporter, processor._registration.Workspace);
36 37 38 39 40 41

                        Start();
                    }

                    protected override Task WaitAsync(CancellationToken cancellationToken)
                    {
42
                        return _workItemQueue.WaitAsync(cancellationToken);
43 44 45 46 47 48 49
                    }

                    protected override async Task ExecuteAsync()
                    {
                        try
                        {
                            // we wait for global operation, higher and normal priority processor to finish its working
H
Heejae Chang 已提交
50
                            await WaitForHigherPriorityOperationsAsync().ConfigureAwait(false);
51

52
                            // process any available project work, preferring the active project.
53
                            if (_workItemQueue.TryTakeAnyWork(
54
                                this.Processor.GetActiveProject(), this.Processor.DependencyGraph, this.Processor.DiagnosticAnalyzerService,
C
CyrusNajmabadi 已提交
55
                                out var workItem, out var projectCancellation))
56 57 58 59
                            {
                                await ProcessProjectAsync(this.Analyzers, workItem, projectCancellation).ConfigureAwait(false);
                            }
                        }
60
                        catch (Exception e) when (FatalError.ReportUnlessCanceled(e))
61 62 63
                        {
                            throw ExceptionUtilities.Unreachable;
                        }
64
                    }
65

H
Heejae Chang 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
                    protected override Task HigherQueueOperationTask
                    {
                        get
                        {
                            return Task.WhenAll(this.Processor._highPriorityProcessor.Running, this.Processor._normalPriorityProcessor.Running);
                        }
                    }

                    protected override bool HigherQueueHasWorkItem
                    {
                        get
                        {
                            return this.Processor._highPriorityProcessor.HasAnyWork || this.Processor._normalPriorityProcessor.HasAnyWork;
                        }
                    }

82 83
                    protected override void PauseOnGlobalOperation()
                    {
84 85
                        base.PauseOnGlobalOperation();

86 87 88
                        _workItemQueue.RequestCancellationOnRunningTasks();
                    }

89 90 91 92 93
                    public void Enqueue(WorkItem item)
                    {
                        this.UpdateLastAccessTime();

                        // Project work
94
                        item = item.With(documentId: null, projectId: item.ProjectId, asyncToken: this.Processor._listener.BeginAsyncOperation("WorkItem"));
95

96
                        var added = _workItemQueue.AddOrReplace(item);
97

H
Heejae Chang 已提交
98 99 100 101
                        // lower priority queue gets lowest time slot possible. if there is any activity going on in higher queue, it drop whatever it has
                        // and let higher work item run
                        CancelRunningTaskIfHigherQueueHasWorkItem();

102
                        Logger.Log(FunctionId.WorkCoordinator_Project_Enqueue, s_enqueueLogger, Environment.TickCount, item.ProjectId, !added);
103

104
                        SolutionCrawlerLogger.LogWorkItemEnqueue(this.Processor._logAggregator, item.ProjectId);
105 106
                    }

H
Heejae Chang 已提交
107 108 109 110 111 112 113 114 115 116
                    private void CancelRunningTaskIfHigherQueueHasWorkItem()
                    {
                        if (!HigherQueueHasWorkItem)
                        {
                            return;
                        }

                        _workItemQueue.RequestCancellationOnRunningTasks();
                    }

117
                    private async Task ProcessProjectAsync(ImmutableArray<IIncrementalAnalyzer> analyzers, WorkItem workItem, CancellationToken cancellationToken)
118 119 120 121 122 123 124 125 126 127 128 129 130
                    {
                        if (this.CancellationToken.IsCancellationRequested)
                        {
                            return;
                        }

                        // we do have work item for this project
                        var projectId = workItem.ProjectId;
                        var processedEverything = false;
                        var processingSolution = this.Processor.CurrentSolution;

                        try
                        {
131
                            using (Logger.LogBlock(FunctionId.WorkCoordinator_ProcessProjectAsync, w => w.ToString(), workItem, cancellationToken))
132 133 134 135
                            {
                                var project = processingSolution.GetProject(projectId);
                                if (project != null)
                                {
136 137 138
                                    var reasons = workItem.InvocationReasons;
                                    var semanticsChanged = reasons.Contains(PredefinedInvocationReasons.SemanticChanged) ||
                                                           reasons.Contains(PredefinedInvocationReasons.SolutionRemoved);
139

140
                                    using (Processor.EnableCaching(project.Id))
141
                                    {
142
                                        await Processor.RunAnalyzersAsync(analyzers, project, (a, p, c) => a.AnalyzeProjectAsync(p, semanticsChanged, reasons, c), cancellationToken).ConfigureAwait(false);
143 144 145 146
                                    }
                                }
                                else
                                {
147
                                    SolutionCrawlerLogger.LogProcessProjectNotExist(this.Processor._logAggregator);
148 149 150 151

                                    RemoveProject(projectId);
                                }

152 153 154 155 156
                                if (!cancellationToken.IsCancellationRequested)
                                {
                                    processedEverything = true;
                                }
                            }
157
                        }
158
                        catch (Exception e) when (FatalError.ReportUnlessCanceled(e))
159 160 161 162 163 164 165
                        {
                            throw ExceptionUtilities.Unreachable;
                        }
                        finally
                        {
                            // we got cancelled in the middle of processing the project.
                            // let's make sure newly enqueued work item has all the flag needed.
166 167 168
                            // Avoid retry attempts after cancellation is requested, since work will not be processed
                            // after that point.
                            if (!processedEverything && !CancellationToken.IsCancellationRequested)
169
                            {
170
                                _workItemQueue.AddOrReplace(workItem.Retry(this.Listener.BeginAsyncOperation("ReenqueueWorkItem")));
171 172
                            }

173
                            SolutionCrawlerLogger.LogProcessProject(this.Processor._logAggregator, projectId.Id, processedEverything);
174 175

                            // remove one that is finished running
176
                            _workItemQueue.MarkWorkItemDoneFor(projectId);
177 178 179 180 181 182 183 184 185 186 187 188 189 190
                        }
                    }

                    private void RemoveProject(ProjectId projectId)
                    {
                        foreach (var analyzer in this.Analyzers)
                        {
                            analyzer.RemoveProject(projectId);
                        }
                    }

                    public override void Shutdown()
                    {
                        base.Shutdown();
191
                        _workItemQueue.Dispose();
192 193 194 195 196 197 198 199 200
                    }

                    internal void WaitUntilCompletion_ForTestingPurposesOnly(ImmutableArray<IIncrementalAnalyzer> analyzers, List<WorkItem> items)
                    {
                        var uniqueIds = new HashSet<ProjectId>();
                        foreach (var item in items)
                        {
                            if (uniqueIds.Add(item.ProjectId))
                            {
201
                                ProcessProjectAsync(analyzers, item, CancellationToken.None).Wait();
202 203 204 205 206 207 208
                            }
                        }
                    }

                    internal void WaitUntilCompletion_ForTestingPurposesOnly()
                    {
                        // this shouldn't happen. would like to get some diagnostic
209
                        while (_workItemQueue.HasAnyWork)
210 211 212 213 214 215 216 217 218
                        {
                            Environment.FailFast("How?");
                        }
                    }
                }
            }
        }
    }
}