public ParallelPipelineProcessor(int parallelismLevel = 3) { _stages = new List<IPipelineStage<WorkItem, WorkItem>> { new ValidationStage(), new TransformationStage(), new EnrichmentStage() }; _parallelismLevel = parallelismLevel; }
// Stage 2: Data Transformation public class TransformationStage : IPipelineStage<WorkItem, WorkItem> { public string StageName => "Transformation"; net fx 4.0
// Simulate enrichment (e.g., database lookup, API call) await Task.Delay(80, token); input.ProcessedData = $"[ENRICHED] {input.ProcessedData} - Length: {input.ProcessedData.Length}"; Console.WriteLine($"[{StageName}] Item {input.Id}: Enriched data"); return input; } } { new ValidationStage()
public async Task<WorkItem> ProcessAsync(WorkItem input, CancellationToken token) { if (!input.IsValid) return input; new EnrichmentStage() }