Advanced Concurrency Patterns in Go
/ 7 min read
Advanced Concurrency Patterns in Go
This guide explores advanced concurrency patterns and real-world examples in Go. We’ll cover common patterns and their practical applications.
Pipeline Pattern
Pipelines allow you to process data through a series of stages:
package main
import ( "fmt" "math")
// Stage 1: Generate numbersfunc generateNumbers(done <-chan struct{}) <-chan int { numbers := make(chan int) go func() { defer close(numbers) for i := 2; i <= 20; i++ { select { case <-done: return case numbers <- i: } } }() return numbers}
// Stage 2: Filter prime numbersfunc filterPrimes(done <-chan struct{}, input <-chan int) <-chan int { primes := make(chan int) go func() { defer close(primes) for num := range input { if isPrime(num) { select { case <-done: return case primes <- num: } } } }() return primes}
// Stage 3: Square numbersfunc square(done <-chan struct{}, input <-chan int) <-chan int { squared := make(chan int) go func() { defer close(squared) for num := range input { select { case <-done: return case squared <- num * num: } } }() return squared}
// Helper function to check if a number is primefunc isPrime(n int) bool { if n <= 1 { return false } for i := 2; i <= int(math.Sqrt(float64(n))); i++ { if n%i == 0 { return false } } return true}
func main() { // Create done channel for cleanup done := make(chan struct{}) defer close(done)
// Set up pipeline numbers := generateNumbers(done) primes := filterPrimes(done, numbers) squared := square(done, primes)
// Print results fmt.Println("Squares of prime numbers between 2 and 20:") for num := range squared { fmt.Printf("%d ", num) } fmt.Println()}Worker Pool with Rate Limiting
This example demonstrates a worker pool that processes tasks with rate limiting:
package main
import ( "fmt" "log" "math/rand" "sync" "time")
// Task represents work to be donetype Task struct { ID int Duration time.Duration}
// Result represents the output of processing a tasktype Result struct { TaskID int WorkerID int StartTime time.Time Duration time.Duration}
// Worker pool with rate limitingfunc worker( id int, tasks <-chan Task, results chan<- Result, rateLimiter <-chan time.Time, wg *sync.WaitGroup,) { defer wg.Done()
for task := range tasks { // Wait for rate limiter <-rateLimiter
// Process task startTime := time.Now() log.Printf("Worker %d starting task %d\n", id, task.ID)
// Simulate work time.Sleep(task.Duration)
// Send result results <- Result{ TaskID: task.ID, WorkerID: id, StartTime: startTime, Duration: task.Duration, }
log.Printf("Worker %d completed task %d\n", id, task.ID) }}
func main() { // Configuration const ( numWorkers = 3 numTasks = 10 rateLimit = 2 // tasks per second maxTaskLength = 2 * time.Second )
// Create channels tasks := make(chan Task, numTasks) results := make(chan Result, numTasks) rateLimiter := time.Tick(time.Second / time.Duration(rateLimit))
// Start workers var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, tasks, results, rateLimiter, &wg) }
// Send tasks go func() { for i := 1; i <= numTasks; i++ { tasks <- Task{ ID: i, Duration: time.Duration(rand.Intn(1000)) * time.Millisecond, } } close(tasks) }()
// Wait for workers in a separate goroutine go func() { wg.Wait() close(results) }()
// Collect and print results var completedTasks []Result for result := range results { completedTasks = append(completedTasks, result) }
// Print summary fmt.Printf("\nTask Completion Summary:\n") fmt.Printf("%-8s %-8s %-20s %-15s\n", "Task ID", "Worker", "Start Time", "Duration") fmt.Println(strings.Repeat("-", 60))
for _, r := range completedTasks { fmt.Printf("%-8d %-8d %-20s %-15s\n", r.TaskID, r.WorkerID, r.StartTime.Format("15:04:05.000"), r.Duration) }}Context-Based Cancellation
This example shows how to use context for graceful cancellation:
package main
import ( "context" "fmt" "log" "sync" "time")
// Represents a service that performs long-running operationstype Service struct { tasks chan int results chan string cancel context.CancelFunc wg sync.WaitGroup shutdown chan struct{}}
// NewService creates a new service instancefunc NewService() *Service { return &Service{ tasks: make(chan int, 10), results: make(chan string, 10), shutdown: make(chan struct{}), }}
// Start initializes the service with the specified number of workersfunc (s *Service) Start(numWorkers int) { ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel
for i := 1; i <= numWorkers; i++ { s.wg.Add(1) go s.worker(ctx, i) }
// Monitor results go s.monitorResults()}
// Worker processes tasksfunc (s *Service) worker(ctx context.Context, id int) { defer s.wg.Done()
for { select { case <-ctx.Done(): log.Printf("Worker %d shutting down\n", id) return case task, ok := <-s.tasks: if !ok { return } // Process task result := s.processTask(ctx, id, task) if result != "" { s.results <- result } } }}
// ProcessTask simulates task processingfunc (s *Service) processTask(ctx context.Context, workerID, taskID int) string { log.Printf("Worker %d processing task %d\n", workerID, taskID)
// Simulate work with potential cancellation select { case <-ctx.Done(): log.Printf("Task %d cancelled\n", taskID) return "" case <-time.After(time.Duration(taskID) * 500 * time.Millisecond): return fmt.Sprintf("Task %d completed by worker %d", taskID, workerID) }}
// Monitor and handle resultsfunc (s *Service) monitorResults() { for result := range s.results { log.Println(result) } close(s.shutdown)}
// SubmitTask adds a new task to the servicefunc (s *Service) SubmitTask(task int) { s.tasks <- task}
// Shutdown gracefully stops the servicefunc (s *Service) Shutdown() { log.Println("Initiating shutdown...")
// Cancel all ongoing operations s.cancel()
// Close tasks channel close(s.tasks)
// Wait for all workers to finish s.wg.Wait()
// Close results channel close(s.results)
// Wait for monitor to finish <-s.shutdown
log.Println("Shutdown complete")}
func main() { // Create and start service service := NewService() service.Start(3)
// Submit some tasks for i := 1; i <= 5; i++ { service.SubmitTask(i) }
// Run for a while time.Sleep(2 * time.Second)
// Shutdown service service.Shutdown()}Fan-Out Fan-In with Error Handling
This example demonstrates the fan-out fan-in pattern with proper error handling:
package main
import ( "context" "fmt" "math/rand" "sync" "time")
// Result represents the outcome of processingtype Result struct { Value int Err error}
// Generator produces numbers with potential errorsfunc generator(ctx context.Context) <-chan int { out := make(chan int) go func() { defer close(out) for i := 1; i <= 100; i++ { select { case <-ctx.Done(): return case out <- i: } } }() return out}
// Processor handles the actual workfunc processor(ctx context.Context, id int, input <-chan int) <-chan Result { out := make(chan Result) go func() { defer close(out) for num := range input { // Simulate work and possible errors result := Result{Value: num}
// Simulate processing time time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// Simulate occasional errors if rand.Float32() < 0.1 { // 10% chance of error result.Err = fmt.Errorf("processor %d: error processing %d", id, num) } else { result.Value *= 2 // Double the number }
select { case <-ctx.Done(): return case out <- result: } } }() return out}
// Merge combines multiple channels into onefunc merge(ctx context.Context, cs ...<-chan Result) <-chan Result { var wg sync.WaitGroup out := make(chan Result)
// Start an output goroutine for each input channel output := func(c <-chan Result) { defer wg.Done() for r := range c { select { case <-ctx.Done(): return case out <- r: } } }
wg.Add(len(cs)) for _, c := range cs { go output(c) }
// Start a goroutine to close out once all output goroutines are done go func() { wg.Wait() close(out) }()
return out}
func main() { // Create context with cancellation ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()
// Create input channel input := generator(ctx)
// Fan out to multiple processors numProcessors := 3 processors := make([]<-chan Result, numProcessors) for i := 0; i < numProcessors; i++ { processors[i] = processor(ctx, i+1, input) }
// Fan in the results results := merge(ctx, processors...)
// Process results var ( processed int errors []error )
// Collect and handle results for result := range results { if result.Err != nil { errors = append(errors, result.Err) continue } processed++ }
// Print summary fmt.Printf("\nProcessing Summary:\n") fmt.Printf("Successfully processed: %d numbers\n", processed) fmt.Printf("Encountered %d errors:\n", len(errors)) for _, err := range errors { fmt.Printf("- %v\n", err) }}These examples demonstrate advanced concurrency patterns with proper error handling, cancellation, and resource management. Each example is fully functional and can be used as a starting point for real-world applications.
In the next post, we’ll explore Microservices with Go!