#1: Add broadcaster structure to broadcast results of the same usernames

This commit is contained in:
ErickSkrauch 2019-04-20 03:23:49 +03:00
parent 8244351bb5
commit abea94a41f
5 changed files with 203 additions and 6 deletions

View File

@ -0,0 +1,53 @@
package queue
import (
"sync"
"github.com/elyby/chrly/api/mojang"
)
type broadcastMap struct {
lock sync.Mutex
listeners map[string][]chan *mojang.SignedTexturesResponse
}
func newBroadcaster() *broadcastMap {
return &broadcastMap{
listeners: make(map[string][]chan *mojang.SignedTexturesResponse),
}
}
// Returns a boolean value, which will be true if the username passed didn't exist before
func (c *broadcastMap) AddListener(username string, resultChan chan *mojang.SignedTexturesResponse) bool {
c.lock.Lock()
defer c.lock.Unlock()
val, alreadyHasSource := c.listeners[username]
if alreadyHasSource {
c.listeners[username] = append(val, resultChan)
return false
}
c.listeners[username] = []chan *mojang.SignedTexturesResponse{resultChan}
return true
}
func (c *broadcastMap) BroadcastAndRemove(username string, result *mojang.SignedTexturesResponse) {
c.lock.Lock()
defer c.lock.Unlock()
val, ok := c.listeners[username]
if !ok {
return
}
for _, channel := range val {
go func(channel chan *mojang.SignedTexturesResponse) {
channel <- result
close(channel)
}(channel)
}
delete(c.listeners, username)
}

View File

@ -0,0 +1,75 @@
package queue
import (
"github.com/elyby/chrly/api/mojang"
testify "github.com/stretchr/testify/assert"
"testing"
)
func TestBroadcastMap_GetOrAppend(t *testing.T) {
t.Run("first call when username didn't exist before should return true", func(t *testing.T) {
assert := testify.New(t)
broadcaster := newBroadcaster()
channel := make(chan *mojang.SignedTexturesResponse)
isFirstListener := broadcaster.AddListener("mock", channel)
assert.True(isFirstListener)
listeners, ok := broadcaster.listeners["mock"]
assert.True(ok)
assert.Len(listeners, 1)
assert.Equal(channel, listeners[0])
})
t.Run("subsequent calls should return false", func(t *testing.T) {
assert := testify.New(t)
broadcaster := newBroadcaster()
channel1 := make(chan *mojang.SignedTexturesResponse)
isFirstListener := broadcaster.AddListener("mock", channel1)
assert.True(isFirstListener)
channel2 := make(chan *mojang.SignedTexturesResponse)
isFirstListener = broadcaster.AddListener("mock", channel2)
assert.False(isFirstListener)
channel3 := make(chan *mojang.SignedTexturesResponse)
isFirstListener = broadcaster.AddListener("mock", channel3)
assert.False(isFirstListener)
})
}
func TestBroadcastMap_BroadcastAndRemove(t *testing.T) {
t.Run("should broadcast to all listeners and remove the key", func(t *testing.T) {
assert := testify.New(t)
broadcaster := newBroadcaster()
channel1 := make(chan *mojang.SignedTexturesResponse)
channel2 := make(chan *mojang.SignedTexturesResponse)
broadcaster.AddListener("mock", channel1)
broadcaster.AddListener("mock", channel2)
result := &mojang.SignedTexturesResponse{Id: "mockUuid"}
broadcaster.BroadcastAndRemove("mock", result)
assert.Equal(result, <-channel1)
assert.Equal(result, <-channel2)
channel3 := make(chan *mojang.SignedTexturesResponse)
isFirstListener := broadcaster.AddListener("mock", channel3)
assert.True(isFirstListener)
})
t.Run("call on not exists username", func(t *testing.T) {
assert := testify.New(t)
assert.NotPanics(func() {
broadcaster := newBroadcaster()
broadcaster.BroadcastAndRemove("mock", &mojang.SignedTexturesResponse{})
})
})
}

View File

@ -14,8 +14,8 @@ type jobItem struct {
} }
type jobsQueue struct { type jobsQueue struct {
lock sync.Mutex
items []*jobItem items []*jobItem
lock sync.RWMutex
} }
func (s *jobsQueue) New() *jobsQueue { func (s *jobsQueue) New() *jobsQueue {

View File

@ -20,20 +20,30 @@ type JobsQueue struct {
onFirstCall sync.Once onFirstCall sync.Once
queue jobsQueue queue jobsQueue
broadcast *broadcastMap
} }
func (ctx *JobsQueue) GetTexturesForUsername(username string) chan *mojang.SignedTexturesResponse { func (ctx *JobsQueue) GetTexturesForUsername(username string) chan *mojang.SignedTexturesResponse {
ctx.onFirstCall.Do(func() { ctx.onFirstCall.Do(func() {
ctx.queue.New() ctx.queue.New()
ctx.broadcast = newBroadcaster()
ctx.startQueue() ctx.startQueue()
}) })
responseChan := make(chan *mojang.SignedTexturesResponse)
isFirstListener := ctx.broadcast.AddListener(username, responseChan)
if isFirstListener {
resultChan := make(chan *mojang.SignedTexturesResponse) resultChan := make(chan *mojang.SignedTexturesResponse)
// TODO: prevent of adding the same username more than once
ctx.queue.Enqueue(&jobItem{username, resultChan}) ctx.queue.Enqueue(&jobItem{username, resultChan})
// TODO: return nil if processing takes more than 5 seconds // TODO: return nil if processing takes more than 5 seconds
return resultChan go func() {
result := <-resultChan
ctx.broadcast.BroadcastAndRemove(username, result)
}()
}
return responseChan
} }
func (ctx *JobsQueue) startQueue() { func (ctx *JobsQueue) startQueue() {

View File

@ -3,6 +3,7 @@ package queue
import ( import (
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"time"
"github.com/elyby/chrly/api/mojang" "github.com/elyby/chrly/api/mojang"
@ -135,6 +136,64 @@ func (suite *QueueTestSuite) TestReceiveTexturesForMoreThan100Usernames() {
suite.Iterate() suite.Iterate()
} }
func (suite *QueueTestSuite) TestReceiveTexturesForTheSameUsernames() {
suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{
{Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"},
}, nil)
suite.MojangApi.On("UuidToTextures", "0d252b7218b648bfb86c2ae476954d32", true).Once().Return(
&mojang.SignedTexturesResponse{Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"},
nil,
)
resultChan1 := suite.Queue.GetTexturesForUsername("maksimkurb")
resultChan2 := suite.Queue.GetTexturesForUsername("maksimkurb")
suite.Iterate()
result1 := <-resultChan1
result2 := <-resultChan2
if suite.Assert().NotNil(result1) {
suite.Assert().Equal("0d252b7218b648bfb86c2ae476954d32", result1.Id)
suite.Assert().Equal("maksimkurb", result1.Name)
suite.Assert().Equal(result1, result2)
}
}
func (suite *QueueTestSuite) TestReceiveTexturesForUsernameThatAlreadyProcessing() {
suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{
{Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"},
}, nil)
suite.MojangApi.On("UuidToTextures", "0d252b7218b648bfb86c2ae476954d32", true).
Once().
After(10*time.Millisecond). // Simulate long round trip
Return(
&mojang.SignedTexturesResponse{Id: "0d252b7218b648bfb86c2ae476954d32", Name: "maksimkurb"},
nil,
)
resultChan1 := suite.Queue.GetTexturesForUsername("maksimkurb")
// Note that for entire test there is only one iteration
suite.Iterate()
// Let it meet delayed UuidToTextures request
time.Sleep(5 * time.Millisecond)
resultChan2 := suite.Queue.GetTexturesForUsername("maksimkurb")
result1 := <-resultChan1
result2 := <-resultChan2
if suite.Assert().NotNil(result1) {
suite.Assert().Equal("0d252b7218b648bfb86c2ae476954d32", result1.Id)
suite.Assert().Equal("maksimkurb", result1.Name)
suite.Assert().Equal(result1, result2)
}
}
func (suite *QueueTestSuite) TestDoNothingWhenNoTasks() { func (suite *QueueTestSuite) TestDoNothingWhenNoTasks() {
suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{}, nil) suite.MojangApi.On("UsernameToUuids", []string{"maksimkurb"}).Once().Return([]*mojang.ProfileInfo{}, nil)