diff --git a/api/mojang/queue/broadcast.go b/api/mojang/queue/broadcast.go new file mode 100644 index 0000000..56c3b5e --- /dev/null +++ b/api/mojang/queue/broadcast.go @@ -0,0 +1,53 @@ +package queue + +import ( + "sync" + + "github.com/elyby/chrly/api/mojang" +) + +type broadcastMap struct { + lock sync.Mutex + listeners map[string][]chan *mojang.SignedTexturesResponse +} + +func newBroadcaster() *broadcastMap { + return &broadcastMap{ + listeners: make(map[string][]chan *mojang.SignedTexturesResponse), + } +} + +// Returns a boolean value, which will be true if the username passed didn't exist before +func (c *broadcastMap) AddListener(username string, resultChan chan *mojang.SignedTexturesResponse) bool { + c.lock.Lock() + defer c.lock.Unlock() + + val, alreadyHasSource := c.listeners[username] + if alreadyHasSource { + c.listeners[username] = append(val, resultChan) + return false + } + + c.listeners[username] = []chan *mojang.SignedTexturesResponse{resultChan} + + return true +} + +func (c *broadcastMap) BroadcastAndRemove(username string, result *mojang.SignedTexturesResponse) { + c.lock.Lock() + defer c.lock.Unlock() + + val, ok := c.listeners[username] + if !ok { + return + } + + for _, channel := range val { + go func(channel chan *mojang.SignedTexturesResponse) { + channel <- result + close(channel) + }(channel) + } + + delete(c.listeners, username) +} diff --git a/api/mojang/queue/broadcast_test.go b/api/mojang/queue/broadcast_test.go new file mode 100644 index 0000000..910ea8b --- /dev/null +++ b/api/mojang/queue/broadcast_test.go @@ -0,0 +1,75 @@ +package queue + +import ( + "github.com/elyby/chrly/api/mojang" + + testify "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroadcastMap_GetOrAppend(t *testing.T) { + t.Run("first call when username didn't exist before should return true", func(t *testing.T) { + assert := testify.New(t) + + broadcaster := newBroadcaster() + channel := make(chan *mojang.SignedTexturesResponse) + isFirstListener := broadcaster.AddListener("mock", channel) + + assert.True(isFirstListener) + listeners, ok := broadcaster.listeners["mock"] + assert.True(ok) + assert.Len(listeners, 1) + assert.Equal(channel, listeners[0]) + }) + + t.Run("subsequent calls should return false", func(t *testing.T) { + assert := testify.New(t) + + broadcaster := newBroadcaster() + channel1 := make(chan *mojang.SignedTexturesResponse) + isFirstListener := broadcaster.AddListener("mock", channel1) + + assert.True(isFirstListener) + + channel2 := make(chan *mojang.SignedTexturesResponse) + isFirstListener = broadcaster.AddListener("mock", channel2) + + assert.False(isFirstListener) + + channel3 := make(chan *mojang.SignedTexturesResponse) + isFirstListener = broadcaster.AddListener("mock", channel3) + + assert.False(isFirstListener) + }) +} + +func TestBroadcastMap_BroadcastAndRemove(t *testing.T) { + t.Run("should broadcast to all listeners and remove the key", func(t *testing.T) { + assert := testify.New(t) + + broadcaster := newBroadcaster() + channel1 := make(chan *mojang.SignedTexturesResponse) + channel2 := make(chan *mojang.SignedTexturesResponse) + broadcaster.AddListener("mock", channel1) + broadcaster.AddListener("mock", channel2) + + result := &mojang.SignedTexturesResponse{Id: "mockUuid"} + broadcaster.BroadcastAndRemove("mock", result) + + assert.Equal(result, <-channel1) + assert.Equal(result, <-channel2) + + channel3 := make(chan *mojang.SignedTexturesResponse) + isFirstListener := broadcaster.AddListener("mock", channel3) + assert.True(isFirstListener) + }) + + t.Run("call on not exists username", func(t *testing.T) { + assert := testify.New(t) + + assert.NotPanics(func() { + broadcaster := newBroadcaster() + broadcaster.BroadcastAndRemove("mock", &mojang.SignedTexturesResponse{}) + }) + }) +} diff --git a/api/mojang/queue/jobs_structure.go b/api/mojang/queue/jobs_structure.go index 02d624d..987be3d 100644 --- a/api/mojang/queue/jobs_structure.go +++ b/api/mojang/queue/jobs_structure.go @@ -14,8 +14,8 @@ type jobItem struct { } type jobsQueue struct { + lock sync.Mutex items []*jobItem - lock sync.RWMutex } func (s *jobsQueue) New() *jobsQueue { diff --git a/api/mojang/queue/queue.go b/api/mojang/queue/queue.go index 2a330cf..5601609 100644 --- a/api/mojang/queue/queue.go +++ b/api/mojang/queue/queue.go @@ -20,20 +20,30 @@ type JobsQueue struct { onFirstCall sync.Once queue jobsQueue + broadcast *broadcastMap } func (ctx *JobsQueue) GetTexturesForUsername(username string) chan *mojang.SignedTexturesResponse { ctx.onFirstCall.Do(func() { ctx.queue.New() + ctx.broadcast = newBroadcaster() ctx.startQueue() }) - resultChan := make(chan *mojang.SignedTexturesResponse) - // TODO: prevent of adding the same username more than once - ctx.queue.Enqueue(&jobItem{username, resultChan}) - // TODO: return nil if processing takes more than 5 seconds + responseChan := make(chan *mojang.SignedTexturesResponse) + isFirstListener := ctx.broadcast.AddListener(username, responseChan) + if isFirstListener { + resultChan := make(chan *mojang.SignedTexturesResponse) + ctx.queue.Enqueue(&jobItem{username, resultChan}) + // TODO: return nil if processing takes more than 5 seconds - return resultChan + go func() { + result := <-resultChan + ctx.broadcast.BroadcastAndRemove(username, result) + }() + } + + return responseChan } func (ctx *JobsQueue) startQueue() { diff --git a/api/mojang/queue/queue_test.go b/api/mojang/queue/queue_test.go index d53b800..6f26292 100644 --- a/api/mojang/queue/queue_test.go +++ b/api/mojang/queue/queue_test.go @@ -3,6 +3,7 @@ package queue import ( "crypto/rand" "encoding/base64" + "time" "github.com/elyby/chrly/api/mojang" @@ -135,6 +136,64 @@ func (suite *QueueTestSuite) TestReceiveTexturesForMoreThan100Usernames() { suite.Iterate() } +func (suite *QueueTestSuite) TestReceiveTexturesForTheSameUsernames() { + suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{ + {Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"}, + }, nil) + suite.MojangApi.On("UuidToTextures", "0d252b7218b648bfb86c2ae476954d32", true).Once().Return( + &mojang.SignedTexturesResponse{Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"}, + nil, + ) + + resultChan1 := suite.Queue.GetTexturesForUsername("maksimkurb") + resultChan2 := suite.Queue.GetTexturesForUsername("maksimkurb") + + suite.Iterate() + + result1 := <-resultChan1 + result2 := <-resultChan2 + + if suite.Assert().NotNil(result1) { + suite.Assert().Equal("0d252b7218b648bfb86c2ae476954d32", result1.Id) + suite.Assert().Equal("maksimkurb", result1.Name) + + suite.Assert().Equal(result1, result2) + } +} + +func (suite *QueueTestSuite) TestReceiveTexturesForUsernameThatAlreadyProcessing() { + suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{ + {Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"}, + }, nil) + suite.MojangApi.On("UuidToTextures", "0d252b7218b648bfb86c2ae476954d32", true). + Once(). + After(10*time.Millisecond). // Simulate long round trip + Return( + &mojang.SignedTexturesResponse{Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"}, + nil, + ) + + resultChan1 := suite.Queue.GetTexturesForUsername("maksimkurb") + + // Note that for entire test there is only one iteration + suite.Iterate() + + // Let it meet delayed UuidToTextures request + time.Sleep(5 * time.Millisecond) + + resultChan2 := suite.Queue.GetTexturesForUsername("maksimkurb") + + result1 := <-resultChan1 + result2 := <-resultChan2 + + if suite.Assert().NotNil(result1) { + suite.Assert().Equal("0d252b7218b648bfb86c2ae476954d32", result1.Id) + suite.Assert().Equal("maksimkurb", result1.Name) + + suite.Assert().Equal(result1, result2) + } +} + func (suite *QueueTestSuite) TestDoNothingWhenNoTasks() { suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{}, nil)