Simple worker pool using generic channels
Generics in go offer us the ability to create types that can be used to represent different types of values. Though so far we been able to tackle everything without them one needs to admit that generics can become handy when dealing with complex types and process pipelines (though they obviously bring a new layer of complexity to deal with).
Let’s explore such a use case by implementing a simple worker pool using generic channels in order to process some tasks and pass the results of each task to another worker pool.
What we want to achieve
Let’s say that we have a minio bucket storing assets for clients at the root of the bucket and we want to safely rename each assets
to a “scoped” name in such a way that the resulting name will be under a client specific prefix like such : clients/<client_id>/assets/<file_name>
.
Let’s also imagine that we have a table in a database somewhere referencing each asset with their filename and that we’d like to migrate all the assets in such a way that it wouldn’t cause any disruption to the service accessibility.
Here one could do to achieve such a task in a simple way :
- First retrieve the list of assets from the database.
- Then for each asset, retrieve the file from minio and copy it to the new location.
- Then update the database with the new filename.
- Finally, delete the old file from minio.
As you can imagine if we have lots of assets and we decide to do this sequentially, this will be a very long process. Also if we decide to do it in parallel, we’ll have to constrain ourselves to a certain number of concurrent workers in order to avoid overloading the system and eat up all the resources.
Having considered all the above, we can see that generic channels and worker pools seems to be a good fit for such a use case.
Defining the worker pool package
Let’s start by defining the worker pool package that will be used to manage the workers and process our tasks.
package pool
import (
"context"
"reflect"
"sync"
)
// JobConsumer is the interface that represents a consumer of jobs.
type JobConsumer interface {
ConsumeJobs(ctx context.Context) JobConsumer
}
// ResultProducer is the interface that represents a producer of jobs results.
type ResultProducer interface {
JobResults() <-chan interface{}
}
// WorkerPool is the concrete implementation of the JobConsumer interface
// using a generic channel as the underlying jobs provider.
type WorkerPool[JOB any, JOB_RESULT any] struct {
workersNum uint
jobsChan chan JOB
resultsChan chan JOB_RESULT
waitGroup sync.WaitGroup
}
// JobConsumerFunc is the type of the function that will be executed by the workers
type JobConsumerFunc[JOB any, JOB_RESULT any] func(ctx context.Context, job JOB) JOB_RESULT
// NewWorkerPool is a constructor for the WorkerPool type.
func NewWorkerPool[JOB any, JOB_RESULT any](workersNum uint, jobsChan chan JOB) *WorkerPool[JOB, JOB_RESULT] {
return &WorkerPool[JOB, JOB_RESULT]{
workersNum: workersNum,
jobsChan: jobsChan,
resultsChan: make(chan JOB_RESULT, workersNum),
waitGroup: sync.WaitGroup{},
}
}
// Consume is the implementation of the JobConsumer interface.
func (wp *WorkerPool[JOB, JOB_RESULT]) ConsumeJobs(ctx context.Context, jobConsumerFunc JobConsumerFunc[JOB, JOB_RESULT]) *WorkerPool[JOB, JOB_RESULT] {
for i := uint(0); i < wp.workersNum; i++ {
wp.waitGroup.Add(1)
go func() {
defer wp.waitGroup.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-wp.jobsChan:
if !ok {
return
}
res := jobConsumerFunc(ctx, job)
if reflect.ValueOf(res).Kind() == reflect.Pointer && reflect.ValueOf(res).IsNil() {
continue
}
wp.resultsChan <- res
}
}
}()
}
go func() {
wp.waitGroup.Wait()
close(wp.resultsChan)
}()
return wp
}
func (wp *WorkerPool[JOB, JOB_RESULT]) JobResults() chan JOB_RESULT {
return wp.resultsChan
}
func (wp *WorkerPool[JOB, JOB_RESULT]) Wait() {
for range wp.resultsChan {
}
}
This basic implementation allows us to create a worker pool that will be able to process any type of jobs and produce a result of any type that can be consumed through the exposed result chanel.
The processing can be cancelled at any point by closing the provided context. As of now this implementation doesn’t support graceful shutdown as it’ll be abruptly closed but this could be easily added in the future.
Note that within the job consumer function we use the reflect package to check if the result is a pointer and if it is, we check if it is nil in order to avoid sending the processed result to the result channel.
The worker pool will start consuming jobs from the provided jobs channel as soon as the ConsumeJobs method is called. This one takes in a function that will be executed by the workers so as to provide the actual processing logic by the caller.
This implementation, though very basic, offer great flexibility as the worker pools have the ability to chain themselves by make the result of the previous one become the input of the next one. Let’s see how it can be done in the next section.
Putting it all together
Let’s see how we can use our newly created pool package to migrate our assets by creating three worker pool which will be chained together to first copy the assets with the new name, then update the database with the new name, and finally delete the old files.
As you’ll see we’ll use pointers as jobs in order to return a nil
value if one of the worker fails. By doing so
we’ll be able to chain the workers and avoid the need to check for the result of the previous one (basically if during the
database rename stage we encounter an error, then we’ll avoid deleting the original asset as the database still points to this one).
You’ll also notice that having multiple pools we are able to fine tune the number of workers allocated to each stage. Therefore we’ll provide 100 workers for the copy stage, and only 10 for the database update stage as to limit the number of concurrent database updates (which is usually more likely to encounter performance issue than the copy stage).
package main
import (
"context"
"fmt"
"sync"
"pool/pkg/bucket"
"pool/pkg/db"
"pool/pkg/models"
"pool/pkg/pool"
)
func main() {
jobsNum := uint(10000)
db := db.NewFakeDB(jobsNum)
copier := bucket.NewFakeAssetCopierRemover(jobsNum)
summary := models.NewSummary()
ctx := context.Background()
jobs := make(chan models.ClientAsset, 10)
// Create a pool of 100 workers that will copy the files from the source to the destination.
copyAssetsPool := pool.NewWorkerPool[models.ClientAsset, *models.CopiedClientAsset](100, jobs)
copyAssetsPool.ConsumeJobs(ctx, func(ctx context.Context, job models.ClientAsset) *models.CopiedClientAsset {
if err := copier.CopyAsset(ctx, job.AssetName, job.ClientID); err != nil {
fmt.Printf("failed to copy asset %s: %s\n", job.AssetName, err)
return nil
}
summary.AddCopiedAssetsNum()
fmt.Printf("copied asset %s\n", job.AssetName)
return &models.CopiedClientAsset{
ClientID: job.ClientID,
OriginalAssetName: job.AssetName,
NewAssetName: fmt.Sprintf("clients/%s/assets/%s", job.ClientID, job.AssetName),
}
})
// Create a pool of 10 workers that will update the filename in the database to point to the new assets.
updateAssetsNamePool := pool.NewWorkerPool[*models.CopiedClientAsset, *models.RenamedClientAsset](10, copyAssetsPool.JobResults())
updateAssetsNamePool.ConsumeJobs(ctx, func(ctx context.Context, job *models.CopiedClientAsset) *models.RenamedClientAsset {
if err := db.UpdateAssetName(ctx, job.ClientID, job.NewAssetName); err != nil {
fmt.Printf("failed to update asset name %s: %s\n", job.NewAssetName, err)
return nil
}
summary.AddRenamedAssetsNum()
fmt.Printf("updated asset name %s\n", job.NewAssetName)
return &models.RenamedClientAsset{
OriginalAssetName: job.OriginalAssetName,
}
})
// Create a pool of 100 workers that will delete the original source files that we've copied.
deleteOriginalAssetsPool := pool.NewWorkerPool[*models.RenamedClientAsset, *models.DeletedClientAsset](100, updateAssetsNamePool.JobResults())
deleteOriginalAssetsPool.ConsumeJobs(ctx, func(ctx context.Context, job *models.RenamedClientAsset) *models.DeletedClientAsset {
if err := copier.RemoveAsset(ctx, job.OriginalAssetName); err != nil {
fmt.Printf("failed to delete asset %s: %s\n", job.OriginalAssetName, err)
return nil
}
summary.AddDeletedAssetsNum()
fmt.Printf("deleted asset %s\n", job.OriginalAssetName)
return &models.DeletedClientAsset{
OriginalAssetName: job.OriginalAssetName,
}
})
// Start the pipeline by sending jobs. Once all jobs have been sent we can safely close the jobs channel.
go func() {
assets, err := db.GetClientsAssets(ctx)
if err != nil {
panic(err)
}
for _, asset := range assets {
jobs <- asset
}
close(jobs)
}()
deleteOriginalAssetsPool.Wait()
fmt.Printf("summary: %+v\n", summary)
}
Here is the reference to the models package that we’ve used in this example.
package models
import "sync"
type ClientAsset struct {
ClientID string
AssetName string
}
type CopiedClientAsset struct {
ClientID string
OriginalAssetName string
NewAssetName string
}
type RenamedClientAsset struct {
OriginalAssetName string
}
type DeletedClientAsset struct {
OriginalAssetName string
}
type Summary struct {
CopiedAssetsNum uint
RenamedAssetsNum uint
DeletedAssetsNum uint
mu sync.Mutex
}
func NewSummary() *Summary {
return &Summary{}
}
func (summary *Summary) AddCopiedAssetsNum() {
summary.mu.Lock()
defer summary.mu.Unlock()
summary.CopiedAssetsNum++
}
func (summary *Summary) AddRenamedAssetsNum() {
summary.mu.Lock()
defer summary.mu.Unlock()
summary.RenamedAssetsNum++
}
func (summary *Summary) AddDeletedAssetsNum() {
summary.mu.Lock()
defer summary.mu.Unlock()
summary.DeletedAssetsNum++
}
Running the example will output the following (As this program is running concurrently, the output will be random):
...
deleted asset asset-2345
deleted asset asset-9538
deleted asset asset-9387
copied asset asset-1218
deleted asset asset-6944
deleted asset asset-4046
deleted asset asset-9191
deleted asset asset-9431
deleted asset asset-4766
deleted asset asset-7688
updated asset name clients/client-2220/assets/asset-2220
deleted asset asset-2220
deleted asset asset-1208
deleted asset asset-3193
updated asset name clients/client-1218/assets/asset-1218
deleted asset asset-1218
summary: &{CopiedAssetsNum:10000 RenamedAssetsNum:10000 DeletedAssetsNum:10000 mu:{state:0 sema:0}}
Obviously this is a basic example, but it’s a good starting point for understanding how to use the pool package and build upon it to provide an easy way to safely consume jobs and take advantage of generics.
Next time we’ll try to build upon the pool package and provide helper functions to make it easier chain workers together.