WorkCoordinator.LowPriorityProcessor.cs 10.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) Microsoft.  All Rights Reserved.  Licensed under the Apache License, Version 2.0.  See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.CodeAnalysis.Internal.Log;
using Microsoft.CodeAnalysis.Notification;
using Microsoft.CodeAnalysis.Shared.TestHooks;
using Roslyn.Utilities;

namespace Microsoft.CodeAnalysis.SolutionCrawler
{
16
    internal sealed partial class SolutionCrawlerRegistrationService
17 18 19 20 21
    {
        private sealed partial class WorkCoordinator
        {
            private sealed partial class IncrementalAnalyzerProcessor
            {
22
                private sealed class LowPriorityProcessor : AbstractPriorityProcessor
23
                {
24 25
                    private readonly Lazy<ImmutableArray<IIncrementalAnalyzer>> _lazyAnalyzers;
                    private readonly AsyncProjectWorkItemQueue _workItemQueue;
26 27 28 29 30 31 32 33 34 35

                    public LowPriorityProcessor(
                        IAsynchronousOperationListener listener,
                        IncrementalAnalyzerProcessor processor,
                        Lazy<ImmutableArray<IIncrementalAnalyzer>> lazyAnalyzers,
                        IGlobalOperationNotificationService globalOperationNotificationService,
                        int backOffTimeSpanInMs,
                        CancellationToken shutdownToken) :
                        base(listener, processor, globalOperationNotificationService, backOffTimeSpanInMs, shutdownToken)
                    {
36
                        _lazyAnalyzers = lazyAnalyzers;
37
                        _workItemQueue = new AsyncProjectWorkItemQueue(processor._registration.ProgressReporter, processor._registration.Workspace);
38 39 40 41 42 43 44 45

                        Start();
                    }

                    internal ImmutableArray<IIncrementalAnalyzer> Analyzers
                    {
                        get
                        {
46
                            return _lazyAnalyzers.Value;
47 48 49 50 51
                        }
                    }

                    protected override Task WaitAsync(CancellationToken cancellationToken)
                    {
52
                        return _workItemQueue.WaitAsync(cancellationToken);
53 54 55 56 57 58 59
                    }

                    protected override async Task ExecuteAsync()
                    {
                        try
                        {
                            // we wait for global operation, higher and normal priority processor to finish its working
H
Heejae Chang 已提交
60
                            await WaitForHigherPriorityOperationsAsync().ConfigureAwait(false);
61

62
                            // process any available project work, preferring the active project.
63
                            WorkItem workItem;
64
                            CancellationTokenSource projectCancellation;
65 66 67
                            if (_workItemQueue.TryTakeAnyWork(
                                this.Processor.GetActiveProject(), this.Processor.DependencyGraph, this.Processor.DiagnosticAnalyzerService, 
                                out workItem, out projectCancellation))
68 69 70 71
                            {
                                await ProcessProjectAsync(this.Analyzers, workItem, projectCancellation).ConfigureAwait(false);
                            }
                        }
72
                        catch (Exception e) when (FatalError.ReportUnlessCanceled(e))
73 74 75
                        {
                            throw ExceptionUtilities.Unreachable;
                        }
76
                    }
77

H
Heejae Chang 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
                    protected override Task HigherQueueOperationTask
                    {
                        get
                        {
                            return Task.WhenAll(this.Processor._highPriorityProcessor.Running, this.Processor._normalPriorityProcessor.Running);
                        }
                    }

                    protected override bool HigherQueueHasWorkItem
                    {
                        get
                        {
                            return this.Processor._highPriorityProcessor.HasAnyWork || this.Processor._normalPriorityProcessor.HasAnyWork;
                        }
                    }

94 95
                    protected override void PauseOnGlobalOperation()
                    {
96 97
                        base.PauseOnGlobalOperation();

98 99 100
                        _workItemQueue.RequestCancellationOnRunningTasks();
                    }

101 102 103 104 105
                    public void Enqueue(WorkItem item)
                    {
                        this.UpdateLastAccessTime();

                        // Project work
106
                        item = item.With(documentId: null, projectId: item.ProjectId, asyncToken: this.Processor._listener.BeginAsyncOperation("WorkItem"));
107

108
                        var added = _workItemQueue.AddOrReplace(item);
109

H
Heejae Chang 已提交
110 111 112 113
                        // lower priority queue gets lowest time slot possible. if there is any activity going on in higher queue, it drop whatever it has
                        // and let higher work item run
                        CancelRunningTaskIfHigherQueueHasWorkItem();

114
                        Logger.Log(FunctionId.WorkCoordinator_Project_Enqueue, s_enqueueLogger, Environment.TickCount, item.ProjectId, !added);
115

116
                        SolutionCrawlerLogger.LogWorkItemEnqueue(this.Processor._logAggregator, item.ProjectId);
117 118
                    }

H
Heejae Chang 已提交
119 120 121 122 123 124 125 126 127 128
                    private void CancelRunningTaskIfHigherQueueHasWorkItem()
                    {
                        if (!HigherQueueHasWorkItem)
                        {
                            return;
                        }

                        _workItemQueue.RequestCancellationOnRunningTasks();
                    }

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
                    private async Task ProcessProjectAsync(ImmutableArray<IIncrementalAnalyzer> analyzers, WorkItem workItem, CancellationTokenSource source)
                    {
                        if (this.CancellationToken.IsCancellationRequested)
                        {
                            return;
                        }

                        // we do have work item for this project
                        var projectId = workItem.ProjectId;
                        var processedEverything = false;
                        var processingSolution = this.Processor.CurrentSolution;

                        try
                        {
                            using (Logger.LogBlock(FunctionId.WorkCoordinator_ProcessProjectAsync, source.Token))
                            {
                                var cancellationToken = source.Token;

                                var project = processingSolution.GetProject(projectId);
                                if (project != null)
                                {
150 151 152
                                    var reasons = workItem.InvocationReasons;
                                    var semanticsChanged = reasons.Contains(PredefinedInvocationReasons.SemanticChanged) ||
                                                           reasons.Contains(PredefinedInvocationReasons.SolutionRemoved);
153

154
                                    using (Processor.EnableCaching(project.Id))
155
                                    {
156
                                        await RunAnalyzersAsync(analyzers, project, (a, p, c) => a.AnalyzeProjectAsync(p, semanticsChanged, reasons, c), cancellationToken).ConfigureAwait(false);
157 158 159 160
                                    }
                                }
                                else
                                {
161
                                    SolutionCrawlerLogger.LogProcessProjectNotExist(this.Processor._logAggregator);
162 163 164 165 166 167 168

                                    RemoveProject(projectId);
                                }
                            }

                            processedEverything = true;
                        }
169
                        catch (Exception e) when (FatalError.ReportUnlessCanceled(e))
170 171 172 173 174 175 176 177 178
                        {
                            throw ExceptionUtilities.Unreachable;
                        }
                        finally
                        {
                            // we got cancelled in the middle of processing the project.
                            // let's make sure newly enqueued work item has all the flag needed.
                            if (!processedEverything)
                            {
179
                                _workItemQueue.AddOrReplace(workItem.Retry(this.Listener.BeginAsyncOperation("ReenqueueWorkItem")));
180 181
                            }

182
                            SolutionCrawlerLogger.LogProcessProject(this.Processor._logAggregator, projectId.Id, processedEverything);
183 184

                            // remove one that is finished running
185
                            _workItemQueue.RemoveCancellationSource(projectId);
186 187 188 189 190 191 192 193 194 195 196 197 198 199
                        }
                    }

                    private void RemoveProject(ProjectId projectId)
                    {
                        foreach (var analyzer in this.Analyzers)
                        {
                            analyzer.RemoveProject(projectId);
                        }
                    }

                    public override void Shutdown()
                    {
                        base.Shutdown();
200
                        _workItemQueue.Dispose();
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
                    }

                    internal void WaitUntilCompletion_ForTestingPurposesOnly(ImmutableArray<IIncrementalAnalyzer> analyzers, List<WorkItem> items)
                    {
                        CancellationTokenSource source = new CancellationTokenSource();

                        var uniqueIds = new HashSet<ProjectId>();
                        foreach (var item in items)
                        {
                            if (uniqueIds.Add(item.ProjectId))
                            {
                                ProcessProjectAsync(analyzers, item, source).Wait();
                            }
                        }
                    }

                    internal void WaitUntilCompletion_ForTestingPurposesOnly()
                    {
                        // this shouldn't happen. would like to get some diagnostic
220
                        while (_workItemQueue.HasAnyWork)
221 222 223 224 225 226 227 228 229
                        {
                            Environment.FailFast("How?");
                        }
                    }
                }
            }
        }
    }
}