chrly/api/mojang/queue/queue.go

210 lines
5.1 KiB
Go
Raw Normal View History

package queue
import (
"net"
2019-05-06 01:36:29 +05:30
"net/url"
"regexp"
2019-04-15 04:02:22 +05:30
"strings"
"sync"
"syscall"
2019-04-15 04:02:22 +05:30
"time"
"github.com/mono83/slf/wd"
"github.com/elyby/chrly/api/mojang"
)
var usernamesToUuids = mojang.UsernamesToUuids
var uuidToTextures = mojang.UuidToTextures
2019-05-06 01:36:29 +05:30
var uuidsQueueIterationDelay = time.Second
var forever = func() bool {
return true
}
// https://help.mojang.com/customer/portal/articles/928638
var allowedUsernamesRegex = regexp.MustCompile(`^[\w_]{3,16}$`)
type JobsQueue struct {
Storage Storage
Logger wd.Watchdog
onFirstCall sync.Once
queue jobsQueue
broadcast *broadcastMap
}
func (ctx *JobsQueue) GetTexturesForUsername(username string) chan *mojang.SignedTexturesResponse {
2019-04-27 04:16:15 +05:30
// TODO: convert username to lower case
ctx.onFirstCall.Do(func() {
ctx.queue.New()
ctx.broadcast = newBroadcaster()
ctx.startQueue()
2019-04-15 04:02:22 +05:30
})
responseChan := make(chan *mojang.SignedTexturesResponse)
if !allowedUsernamesRegex.MatchString(username) {
ctx.Logger.IncCounter("mojang_textures.invalid_username", 1)
go func() {
responseChan <- nil
close(responseChan)
}()
return responseChan
}
2019-04-20 22:05:37 +05:30
ctx.Logger.IncCounter("mojang_textures.request", 1)
uuid, err := ctx.Storage.GetUuid(username)
if err == nil && uuid == "" {
ctx.Logger.IncCounter("mojang_textures.usernames.cache_hit_nil", 1)
2019-04-20 22:05:37 +05:30
go func() {
responseChan <- nil
2019-04-20 22:05:37 +05:30
close(responseChan)
}()
return responseChan
}
isFirstListener := ctx.broadcast.AddListener(username, responseChan)
if isFirstListener {
start := time.Now()
// TODO: respond nil if processing takes more than 5 seconds
resultChan := make(chan *mojang.SignedTexturesResponse)
if uuid == "" {
ctx.Logger.IncCounter("mojang_textures.usernames.queued", 1)
ctx.queue.Enqueue(&jobItem{username, resultChan})
} else {
ctx.Logger.IncCounter("mojang_textures.usernames.cache_hit", 1)
go func() {
resultChan <- ctx.getTextures(uuid)
}()
}
go func() {
result := <-resultChan
2019-04-20 22:05:37 +05:30
close(resultChan)
ctx.broadcast.BroadcastAndRemove(username, result)
ctx.Logger.RecordTimer("mojang_textures.result_time", time.Since(start))
}()
} else {
ctx.Logger.IncCounter("mojang_textures.already_in_queue", 1)
}
return responseChan
}
func (ctx *JobsQueue) startQueue() {
2019-04-15 04:02:22 +05:30
go func() {
2019-05-06 01:36:29 +05:30
time.Sleep(uuidsQueueIterationDelay)
for forever() {
2019-04-15 04:02:22 +05:30
start := time.Now()
ctx.queueRound()
elapsed := time.Since(start)
ctx.Logger.RecordTimer("mojang_textures.usernames.round_time", elapsed)
2019-05-06 01:36:29 +05:30
time.Sleep(uuidsQueueIterationDelay)
2019-04-15 04:02:22 +05:30
}
}()
}
func (ctx *JobsQueue) queueRound() {
if ctx.queue.IsEmpty() {
2019-04-15 04:02:22 +05:30
return
}
queueSize := ctx.queue.Size()
jobs := ctx.queue.Dequeue(100)
ctx.Logger.UpdateGauge("mojang_textures.usernames.iteration_size", int64(len(jobs)))
ctx.Logger.UpdateGauge("mojang_textures.usernames.queue_size", int64(queueSize-len(jobs)))
2019-04-15 04:02:22 +05:30
var usernames []string
for _, job := range jobs {
usernames = append(usernames, job.Username)
}
profiles, err := usernamesToUuids(usernames)
if err != nil {
2019-05-06 01:36:29 +05:30
ctx.handleResponseError(err, "usernames")
for _, job := range jobs {
job.RespondTo <- nil
}
2019-04-15 04:02:22 +05:30
return
}
for _, job := range jobs {
go func(job *jobItem) {
2019-04-15 04:02:22 +05:30
var uuid string
2019-05-06 01:36:29 +05:30
// The profiles in the response are not ordered, so we must search each username over full array
2019-04-15 04:02:22 +05:30
for _, profile := range profiles {
if strings.EqualFold(job.Username, profile.Name) {
uuid = profile.Id
break
}
}
2019-05-06 19:42:37 +05:30
_ = ctx.Storage.StoreUuid(job.Username, uuid)
if uuid == "" {
job.RespondTo <- nil
ctx.Logger.IncCounter("mojang_textures.usernames.uuid_miss", 1)
} else {
job.RespondTo <- ctx.getTextures(uuid)
ctx.Logger.IncCounter("mojang_textures.usernames.uuid_hit", 1)
2019-04-15 04:02:22 +05:30
}
}(job)
2019-04-15 04:02:22 +05:30
}
}
func (ctx *JobsQueue) getTextures(uuid string) *mojang.SignedTexturesResponse {
existsTextures, err := ctx.Storage.GetTextures(uuid)
if err == nil {
ctx.Logger.IncCounter("mojang_textures.textures.cache_hit", 1)
return existsTextures
}
ctx.Logger.IncCounter("mojang_textures.textures.request", 1)
start := time.Now()
result, err := uuidToTextures(uuid, true)
ctx.Logger.RecordTimer("mojang_textures.textures.request_time", time.Since(start))
if err != nil {
2019-05-06 01:36:29 +05:30
ctx.handleResponseError(err, "textures")
}
2019-05-06 01:36:29 +05:30
// Mojang can respond with an error, but count it as a hit, so store result even if the textures is nil
ctx.Storage.StoreTextures(uuid, result)
return result
}
2019-05-06 01:36:29 +05:30
func (ctx *JobsQueue) handleResponseError(err error, threadName string) {
ctx.Logger.Debug(":name: Got response error :err", wd.NameParam(threadName), wd.ErrParam(err))
switch err.(type) {
case mojang.ResponseError:
if _, ok := err.(*mojang.TooManyRequestsError); ok {
2019-05-06 01:36:29 +05:30
ctx.Logger.Warning(":name: Got 429 Too Many Requests :err", wd.NameParam(threadName), wd.ErrParam(err))
}
return
case net.Error:
if err.(net.Error).Timeout() {
return
}
2019-05-06 01:36:29 +05:30
if _, ok := err.(*url.Error); ok {
return
}
if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
return
}
if err == syscall.ECONNREFUSED {
return
}
}
2019-05-06 01:36:29 +05:30
ctx.Logger.Emergency(":name: Unknown Mojang response error: :err", wd.NameParam(threadName), wd.ErrParam(err))
}