chrly/mojangtextures/batch_uuids_provider.go

250 lines
4.9 KiB
Go
Raw Permalink Normal View History

package mojangtextures
import (
"context"
"strings"
"sync"
"time"
"github.com/elyby/chrly/api/mojang"
)
type jobResult struct {
Profile *mojang.ProfileInfo
Error error
}
type job struct {
Username string
RespondChan chan *jobResult
}
type jobsQueue struct {
lock sync.Mutex
items []*job
}
func newJobsQueue() *jobsQueue {
return &jobsQueue{
items: []*job{},
}
}
func (s *jobsQueue) Enqueue(job *job) int {
s.lock.Lock()
defer s.lock.Unlock()
s.items = append(s.items, job)
return len(s.items)
}
func (s *jobsQueue) Dequeue(n int) ([]*job, int) {
s.lock.Lock()
defer s.lock.Unlock()
l := len(s.items)
if n > l {
n = l
}
items := s.items[0:n]
s.items = s.items[n:l]
return items, l - n
}
var usernamesToUuids = mojang.UsernamesToUuids
2020-01-03 03:34:23 +05:30
type JobsIteration struct {
Jobs []*job
Queue int
c chan struct{}
}
func (j *JobsIteration) Done() {
if j.c != nil {
close(j.c)
}
2020-01-03 03:34:23 +05:30
}
type BatchUuidsProviderStrategy interface {
Queue(job *job)
GetJobs(abort context.Context) <-chan *JobsIteration
}
type PeriodicStrategy struct {
Delay time.Duration
Batch int
queue *jobsQueue
done chan struct{}
}
func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy {
return &PeriodicStrategy{
Delay: delay,
Batch: batch,
queue: newJobsQueue(),
}
}
func (ctx *PeriodicStrategy) Queue(job *job) {
ctx.queue.Enqueue(job)
}
func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
ch := make(chan *JobsIteration)
go func() {
for {
select {
case <-abort.Done():
close(ch)
return
case <-time.After(ctx.Delay):
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
jobDoneChan := make(chan struct{})
ch <- &JobsIteration{jobs, queueLen, jobDoneChan}
<-jobDoneChan
}
}
}()
return ch
}
type FullBusStrategy struct {
Delay time.Duration
Batch int
queue *jobsQueue
busIsFull chan bool
}
func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy {
return &FullBusStrategy{
Delay: delay,
Batch: batch,
queue: newJobsQueue(),
busIsFull: make(chan bool),
}
}
func (ctx *FullBusStrategy) Queue(job *job) {
n := ctx.queue.Enqueue(job)
if n%ctx.Batch == 0 {
ctx.busIsFull <- true
}
}
// Формально, это описание логики водителя маршрутки xD
func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
ch := make(chan *JobsIteration)
go func() {
for {
t := time.NewTimer(ctx.Delay)
select {
case <-abort.Done():
close(ch)
return
case <-t.C:
ctx.sendJobs(ch)
case <-ctx.busIsFull:
t.Stop()
ctx.sendJobs(ch)
}
}
}()
return ch
}
func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) {
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
ch <- &JobsIteration{jobs, queueLen, nil}
}
type BatchUuidsProvider struct {
context context.Context
emitter Emitter
strategy BatchUuidsProviderStrategy
onFirstCall sync.Once
}
func NewBatchUuidsProvider(
context context.Context,
strategy BatchUuidsProviderStrategy,
emitter Emitter,
) *BatchUuidsProvider {
return &BatchUuidsProvider{
context: context,
emitter: emitter,
strategy: strategy,
}
}
func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) {
ctx.onFirstCall.Do(ctx.startQueue)
resultChan := make(chan *jobResult)
ctx.strategy.Queue(&job{username, resultChan})
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username)
result := <-resultChan
return result.Profile, result.Error
}
func (ctx *BatchUuidsProvider) startQueue() {
// This synchronization chan is used to ensure that strategy's jobs provider
// will be initialized before any job will be scheduled
d := make(chan struct{})
go func() {
jobsChan := ctx.strategy.GetJobs(ctx.context)
close(d)
for {
select {
case <-ctx.context.Done():
return
case iteration := <-jobsChan:
go func() {
ctx.performRequest(iteration)
iteration.Done()
}()
}
}
}()
<-d
}
func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) {
usernames := make([]string, len(iteration.Jobs))
for i, job := range iteration.Jobs {
usernames[i] = job.Username
}
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue)
if len(usernames) == 0 {
return
}
profiles, err := usernamesToUuids(usernames)
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err)
for _, job := range iteration.Jobs {
response := &jobResult{}
if err == nil {
// The profiles in the response aren't ordered, so we must search each username over full array
for _, profile := range profiles {
if strings.EqualFold(job.Username, profile.Name) {
response.Profile = profile
break
}
}
} else {
response.Error = err
}
job.RespondChan <- response
close(job.RespondChan)
}
}