diff --git a/CHANGELOG.md b/CHANGELOG.md index 28984b1..9fde16a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - xxxx-xx-xx +### Added +- [#24](https://github.com/elyby/chrly/issues/24): Added a new batch Mojang UUIDs provider strategy `full-bus` and + corresponding configuration param `QUEUE_STRATEGY` with the default value `periodic`. ## [4.4.1] - 2020-04-24 ### Added diff --git a/README.md b/README.md index 3fc740a..df12620 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,14 @@ docker-compose up -d app Sentry can be used to collect app errors https://public:private@your.sentry.io/1 + + QUEUE_STRATEGY + + Sets the strategy for batch Mojang UUIDs provider queue. Allowed values are periodic and + full-bus (see #24). + + periodic + QUEUE_LOOP_DELAY diff --git a/di/mojang_textures.go b/di/mojang_textures.go index dee1bec..42f385c 100644 --- a/di/mojang_textures.go +++ b/di/mojang_textures.go @@ -1,6 +1,7 @@ package di import ( + "context" "fmt" "net/url" "time" @@ -18,6 +19,9 @@ var mojangTextures = di.Options( di.Provide(newMojangTexturesProvider), di.Provide(newMojangTexturesUuidsProviderFactory), di.Provide(newMojangTexturesBatchUUIDsProvider), + di.Provide(newMojangTexturesBatchUUIDsProviderStrategyFactory), + di.Provide(newMojangTexturesBatchUUIDsProviderDelayedStrategy), + di.Provide(newMojangTexturesBatchUUIDsProviderFullBusStrategy), di.Provide(newMojangTexturesRemoteUUIDsProvider), di.Provide(newMojangSignedTexturesProvider), di.Provide(newMojangTexturesStorageFactory), @@ -75,7 +79,7 @@ func newMojangTexturesUuidsProviderFactory( func newMojangTexturesBatchUUIDsProvider( container *di.Container, - config *viper.Viper, + strategy mojangtextures.BatchUuidsProviderStrategy, emitter mojangtextures.Emitter, ) (*mojangtextures.BatchUuidsProvider, error) { if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker { @@ -106,14 +110,56 @@ func newMojangTexturesBatchUUIDsProvider( return nil, err } + return mojangtextures.NewBatchUuidsProvider(context.Background(), strategy, emitter), nil +} + +func newMojangTexturesBatchUUIDsProviderStrategyFactory( + container *di.Container, + config *viper.Viper, +) (mojangtextures.BatchUuidsProviderStrategy, error) { + config.SetDefault("queue.strategy", "periodic") + + strategyName := config.GetString("queue.strategy") + switch strategyName { + case "periodic": + var strategy *mojangtextures.PeriodicStrategy + err := container.Resolve(&strategy) + if err != nil { + return nil, err + } + + return strategy, nil + case "full-bus": + var strategy *mojangtextures.FullBusStrategy + err := container.Resolve(&strategy) + if err != nil { + return nil, err + } + + return strategy, nil + default: + return nil, fmt.Errorf("unknown queue strategy \"%s\"", strategyName) + } +} + +func newMojangTexturesBatchUUIDsProviderDelayedStrategy(config *viper.Viper) *mojangtextures.PeriodicStrategy { config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) config.SetDefault("queue.batch_size", 10) - return &mojangtextures.BatchUuidsProvider{ - Emitter: emitter, - IterationDelay: config.GetDuration("queue.loop_delay"), - IterationSize: config.GetInt("queue.batch_size"), - }, nil + return mojangtextures.NewPeriodicStrategy( + config.GetDuration("queue.loop_delay"), + config.GetInt("queue.batch_size"), + ) +} + +func newMojangTexturesBatchUUIDsProviderFullBusStrategy(config *viper.Viper) *mojangtextures.FullBusStrategy { + config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) + config.SetDefault("queue.batch_size", 10) + + return mojangtextures.NewFullBusStrategy( + config.GetDuration("queue.loop_delay"), + config.GetInt("queue.batch_size"), + ) } func newMojangTexturesRemoteUUIDsProvider( diff --git a/mojangtextures/batch_uuids_provider.go b/mojangtextures/batch_uuids_provider.go index 2cac5b9..cb99a20 100644 --- a/mojangtextures/batch_uuids_provider.go +++ b/mojangtextures/batch_uuids_provider.go @@ -1,6 +1,7 @@ package mojangtextures import ( + "context" "strings" "sync" "time" @@ -9,131 +10,216 @@ import ( ) type jobResult struct { - profile *mojang.ProfileInfo - error error + Profile *mojang.ProfileInfo + Error error } -type jobItem struct { - username string - respondChan chan *jobResult +type job struct { + Username string + RespondChan chan *jobResult } type jobsQueue struct { lock sync.Mutex - items []*jobItem + items []*job } -func (s *jobsQueue) New() *jobsQueue { - s.items = []*jobItem{} - return s -} - -func (s *jobsQueue) Enqueue(t *jobItem) { - s.lock.Lock() - defer s.lock.Unlock() - - s.items = append(s.items, t) -} - -func (s *jobsQueue) Dequeue(n int) []*jobItem { - s.lock.Lock() - defer s.lock.Unlock() - - if n > s.size() { - n = s.size() +func newJobsQueue() *jobsQueue { + return &jobsQueue{ + items: []*job{}, } - - items := s.items[0:n] - s.items = s.items[n:len(s.items)] - - return items } -func (s *jobsQueue) Size() int { +func (s *jobsQueue) Enqueue(job *job) int { s.lock.Lock() defer s.lock.Unlock() - return s.size() -} + s.items = append(s.items, job) -func (s *jobsQueue) size() int { return len(s.items) } +func (s *jobsQueue) Dequeue(n int) ([]*job, int) { + s.lock.Lock() + defer s.lock.Unlock() + + l := len(s.items) + if n > l { + n = l + } + + items := s.items[0:n] + s.items = s.items[n:l] + + return items, l - n +} + var usernamesToUuids = mojang.UsernamesToUuids -var forever = func() bool { - return true + +type JobsIteration struct { + Jobs []*job + Queue int +} + +type BatchUuidsProviderStrategy interface { + Queue(job *job) + GetJobs(abort context.Context) <-chan *JobsIteration +} + +type PeriodicStrategy struct { + Delay time.Duration + Batch int + queue *jobsQueue +} + +func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy { + return &PeriodicStrategy{ + Delay: delay, + Batch: batch, + queue: newJobsQueue(), + } +} + +func (ctx *PeriodicStrategy) Queue(job *job) { + ctx.queue.Enqueue(job) +} + +func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { + ch := make(chan *JobsIteration) + go func() { + for { + select { + case <-abort.Done(): + return + case <-time.After(ctx.Delay): + jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) + ch <- &JobsIteration{jobs, queueLen} + } + } + }() + + return ch +} + +type FullBusStrategy struct { + Delay time.Duration + Batch int + queue *jobsQueue + ready chan bool +} + +func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy { + return &FullBusStrategy{ + Delay: delay, + Batch: batch, + queue: newJobsQueue(), + ready: make(chan bool), + } +} + +func (ctx *FullBusStrategy) Queue(job *job) { + n := ctx.queue.Enqueue(job) + if n == ctx.Batch { + ctx.ready <- true + } +} + +func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { + ch := make(chan *JobsIteration) + go func() { + for { + t := time.NewTimer(ctx.Delay) + select { + case <-abort.Done(): + return + case <-t.C: + ctx.sendJobs(ch) + case <-ctx.ready: + t.Stop() + ctx.sendJobs(ch) + } + } + }() + + return ch +} + +func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) { + jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) + ch <- &JobsIteration{jobs, queueLen} // TODO: should not wait for iteration result } type BatchUuidsProvider struct { - Emitter - - IterationDelay time.Duration - IterationSize int - + context context.Context + emitter Emitter + strategy BatchUuidsProviderStrategy onFirstCall sync.Once - queue jobsQueue +} + +func NewBatchUuidsProvider(context context.Context, strategy BatchUuidsProviderStrategy, emitter Emitter) *BatchUuidsProvider { + return &BatchUuidsProvider{ + context: context, + emitter: emitter, + strategy: strategy, + } } func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) { - ctx.onFirstCall.Do(func() { - ctx.queue.New() - ctx.startQueue() - }) + ctx.onFirstCall.Do(ctx.startQueue) resultChan := make(chan *jobResult) - ctx.queue.Enqueue(&jobItem{username, resultChan}) - ctx.Emit("mojang_textures:batch_uuids_provider:queued", username) + ctx.strategy.Queue(&job{username, resultChan}) + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username) result := <-resultChan - return result.profile, result.error + return result.Profile, result.Error } func (ctx *BatchUuidsProvider) startQueue() { go func() { - time.Sleep(ctx.IterationDelay) - for forever() { - ctx.Emit("mojang_textures:batch_uuids_provider:before_round") - ctx.queueRound() - ctx.Emit("mojang_textures:batch_uuids_provider:after_round") - time.Sleep(ctx.IterationDelay) + jobsChan := ctx.strategy.GetJobs(ctx.context) + for { + select { + case <-ctx.context.Done(): + return + case iteration := <-jobsChan: + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:before_round") // TODO: where should I move this events? + ctx.performRequest(iteration) + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:after_round") + } } }() } -func (ctx *BatchUuidsProvider) queueRound() { - queueSize := ctx.queue.Size() - jobs := ctx.queue.Dequeue(ctx.IterationSize) - - var usernames []string - for _, job := range jobs { - usernames = append(usernames, job.username) +func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) { + usernames := make([]string, len(iteration.Jobs)) + for i, job := range iteration.Jobs { + usernames[i] = job.Username } - ctx.Emit("mojang_textures:batch_uuids_provider:round", usernames, queueSize-len(jobs)) + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue) if len(usernames) == 0 { return } profiles, err := usernamesToUuids(usernames) - ctx.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) - for _, job := range jobs { - go func(job *jobItem) { - response := &jobResult{} - if err != nil { - response.error = err - } else { - // The profiles in the response aren't ordered, so we must search each username over full array - for _, profile := range profiles { - if strings.EqualFold(job.username, profile.Name) { - response.profile = profile - break - } + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) + for _, job := range iteration.Jobs { + response := &jobResult{} + if err == nil { + // The profiles in the response aren't ordered, so we must search each username over full array + for _, profile := range profiles { + if strings.EqualFold(job.Username, profile.Name) { + response.Profile = profile + break } } + } else { + response.Error = err + } - job.respondChan <- response - }(job) + job.RespondChan <- response + close(job.RespondChan) } } diff --git a/mojangtextures/batch_uuids_provider_test.go b/mojangtextures/batch_uuids_provider_test.go index 55e785a..820aa8b 100644 --- a/mojangtextures/batch_uuids_provider_test.go +++ b/mojangtextures/batch_uuids_provider_test.go @@ -1,64 +1,50 @@ package mojangtextures import ( + "context" "crypto/rand" "encoding/base64" "strings" "testing" - testify "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/elyby/chrly/api/mojang" ) func TestJobsQueue(t *testing.T) { - createQueue := func() *jobsQueue { - queue := &jobsQueue{} - queue.New() - - return queue - } - t.Run("Enqueue", func(t *testing.T) { - assert := testify.New(t) - - s := createQueue() - s.Enqueue(&jobItem{username: "username1"}) - s.Enqueue(&jobItem{username: "username2"}) - s.Enqueue(&jobItem{username: "username3"}) - - assert.Equal(3, s.Size()) + s := newJobsQueue() + require.Equal(t, 1, s.Enqueue(&job{Username: "username1"})) + require.Equal(t, 2, s.Enqueue(&job{Username: "username2"})) + require.Equal(t, 3, s.Enqueue(&job{Username: "username3"})) }) t.Run("Dequeue", func(t *testing.T) { - assert := testify.New(t) + s := newJobsQueue() + s.Enqueue(&job{Username: "username1"}) + s.Enqueue(&job{Username: "username2"}) + s.Enqueue(&job{Username: "username3"}) + s.Enqueue(&job{Username: "username4"}) + s.Enqueue(&job{Username: "username5"}) - s := createQueue() - s.Enqueue(&jobItem{username: "username1"}) - s.Enqueue(&jobItem{username: "username2"}) - s.Enqueue(&jobItem{username: "username3"}) - s.Enqueue(&jobItem{username: "username4"}) + items, queueLen := s.Dequeue(2) + require.Len(t, items, 2) + require.Equal(t, 3, queueLen) + require.Equal(t, "username1", items[0].Username) + require.Equal(t, "username2", items[1].Username) - items := s.Dequeue(2) - assert.Len(items, 2) - assert.Equal("username1", items[0].username) - assert.Equal("username2", items[1].username) - assert.Equal(2, s.Size()) - - items = s.Dequeue(40) - assert.Len(items, 2) - assert.Equal("username3", items[0].username) - assert.Equal("username4", items[1].username) + items, queueLen = s.Dequeue(40) + require.Len(t, items, 3) + require.Equal(t, 0, queueLen) + require.Equal(t, "username3", items[0].Username) + require.Equal(t, "username4", items[1].Username) + require.Equal(t, "username5", items[2].Username) }) } -// This is really stupid test just to get 100% coverage on this package :) -func TestBatchUuidsProvider_forever(t *testing.T) { - testify.True(t, forever()) -} - type mojangUsernamesToUuidsRequestMock struct { mock.Mock } @@ -73,6 +59,24 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string) return result, args.Error(1) } +type queueStrategyMock struct { + mock.Mock + ch chan *JobsIteration +} + +func (m *queueStrategyMock) Queue(job *job) { + m.Called(job) +} + +func (m *queueStrategyMock) GetJobs(abort context.Context) <-chan *JobsIteration { + m.Called(abort) + return m.ch +} + +func (m *queueStrategyMock) PushIteration(iteration *JobsIteration) { + m.ch <- iteration +} + type batchUuidsProviderGetUuidResult struct { Result *mojang.ProfileInfo Error error @@ -86,25 +90,21 @@ type batchUuidsProviderTestSuite struct { Emitter *mockEmitter MojangApi *mojangUsernamesToUuidsRequestMock - - Iterate func() - done func() - iterateChan chan bool } func (suite *batchUuidsProviderTestSuite) SetupTest() { suite.Emitter = &mockEmitter{} suite.Provider = &BatchUuidsProvider{ - Emitter: suite.Emitter, - IterationDelay: 0, - IterationSize: 10, + // Emitter: suite.Emitter, + // IterationDelay: 0, + // IterationSize: 10, } suite.iterateChan = make(chan bool) - forever = func() bool { - return <-suite.iterateChan - } + // forever = func() bool { + // return <-suite.iterateChan + // } suite.Iterate = func() { suite.iterateChan <- true