chrly/worker/worker.go

187 lines
4.7 KiB
Go

package worker
import (
"encoding/json"
"fmt"
"strconv"
"github.com/mono83/slf/wd"
"github.com/streadway/amqp"
"elyby/minecraft-skinsystem/interfaces"
"elyby/minecraft-skinsystem/model"
)
type Services struct {
Channel *amqp.Channel
SkinsRepo interfaces.SkinsRepository
AccountsAPI interfaces.AccountsAPI
Logger wd.Watchdog
}
const exchangeName string = "events"
const queueName string = "skinsystem-accounts-events"
func (service *Services) Run() error {
deliveryChannel, err := setupConsume(service.Channel)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for d := range deliveryChannel {
service.Logger.Debug("Incoming message with routing key " + d.RoutingKey)
var result bool = true
switch d.RoutingKey {
case "accounts.username-changed":
var event *model.UsernameChanged
json.Unmarshal(d.Body, &event)
result = service.HandleChangeUsername(event)
case "accounts.skin-changed":
var event *model.SkinChanged
json.Unmarshal(d.Body, &event)
result = service.HandleSkinChanged(event)
}
if result {
d.Ack(false)
} else {
d.Reject(true)
}
}
}()
<-forever
return nil
}
func (service *Services) HandleChangeUsername(event *model.UsernameChanged) bool {
if event.OldUsername == "" {
service.Logger.IncCounter("worker.change_username.empty_old_username", 1)
record := &model.Skin{
UserId: event.AccountId,
Username: event.NewUsername,
}
service.SkinsRepo.Save(record)
return true
}
record, err := service.SkinsRepo.FindByUserId(event.AccountId)
if err != nil {
service.Logger.IncCounter("worker.change_username.id_not_found", 1)
service.Logger.Warning("Cannot find user id. Trying to search.")
response, err := service.AccountsAPI.AccountInfo("id", strconv.Itoa(event.AccountId))
if err != nil {
service.Logger.IncCounter("worker.change_username.id_not_restored", 1)
service.Logger.Error(fmt.Sprintf("Cannot restore user info. %+v\n", err))
// TODO: логгировать в какой-нибудь Sentry, если там не 404
return true
}
service.Logger.IncCounter("worker.change_username.id_restored", 1)
fmt.Println("User info successfully restored.")
record = &model.Skin{
UserId: response.Id,
}
}
record.Username = event.NewUsername
service.SkinsRepo.Save(record)
service.Logger.IncCounter("worker.change_username.processed", 1)
return true
}
func (service *Services) HandleSkinChanged(event *model.SkinChanged) bool {
record, err := service.SkinsRepo.FindByUserId(event.AccountId)
if err != nil {
service.Logger.IncCounter("worker.skin_changed.id_not_found", 1)
service.Logger.Warning("Cannot find user id. Trying to search.")
response, err := service.AccountsAPI.AccountInfo("id", strconv.Itoa(event.AccountId))
if err != nil {
service.Logger.IncCounter("worker.skin_changed.id_not_restored", 1)
service.Logger.Error(fmt.Sprintf("Cannot restore user info. %+v\n", err))
// TODO: логгировать в какой-нибудь Sentry, если там не 404
return true
}
service.Logger.IncCounter("worker.skin_changed.id_restored", 1)
service.Logger.Info("User info successfully restored.")
record.UserId = response.Id
record.Username = response.Username
}
record.Uuid = event.Uuid
record.SkinId = event.SkinId
record.Hash = event.Hash
record.Is1_8 = event.Is1_8
record.IsSlim = event.IsSlim
record.Url = event.Url
record.MojangTextures = event.MojangTextures
record.MojangSignature = event.MojangSignature
service.SkinsRepo.Save(record)
service.Logger.IncCounter("worker.skin_changed.processed", 1)
return true
}
func setupConsume(channel *amqp.Channel) (<-chan amqp.Delivery, error) {
var err error
err = channel.ExchangeDeclare(
exchangeName, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
_, err = channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
err = channel.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil)
if err != nil {
return nil, err
}
err = channel.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil)
if err != nil {
return nil, err
}
deliveryChannel, err := channel.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
return deliveryChannel, nil
}