From 78917a70d36b6213cb8ad872099f36aa81721481 Mon Sep 17 00:00:00 2001 From: ErickSkrauch Date: Thu, 17 Aug 2017 02:47:35 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A7=D0=B0=D1=81=D1=82=D0=B8=D1=87=D0=BD?= =?UTF-8?q?=D0=BE=20=D0=B2=D0=BE=D1=81=D1=81=D1=82=D0=B0=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=BB=D0=B5=D0=BD=D0=B0=20=D0=BB=D0=BE=D0=B3=D0=B8=D0=BA=D0=B0?= =?UTF-8?q?=20AMQP=20=D0=B2=D0=BE=D1=80=D0=BA=D0=B5=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 23 +++++ bootstrap/bootstrap.go | 34 ++++++++ cmd/amqpWorker.go | 63 ++++++++++++++ db/redis.go | 72 +++++++++++----- model/events.go | 20 +++++ repositories/skins.go | 5 +- worker/worker.go | 188 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 382 insertions(+), 23 deletions(-) create mode 100644 cmd/amqpWorker.go create mode 100644 model/events.go create mode 100644 worker/worker.go diff --git a/README.md b/README.md index 45ba303..c81d792 100644 --- a/README.md +++ b/README.md @@ -36,3 +36,26 @@ docker-compose rm -f app # Удаляем конейтнер docker-compose build app # Запускаем билд по новой docker-compose up -d app # Поднимаем свежесобранный контейнер обратно ``` + +### Шорткаты для разработки + +Потом это надо преобразовать в нормальные доки. + +Run Redis: + +```sh +docker run --rm \ +-p 6379:6379 \ +redis:3.0-alpine +``` + +Run RabbitMQ: + +```sh +docker run --rm \ +-p 5672:5672 \ +-e RABBITMQ_DEFAULT_USER=ely-skinsystem-app \ +-e RABBITMQ_DEFAULT_PASS=ely-skinsystem-app-password \ +-e RABBITMQ_DEFAULT_VHOST=/ely \ +rabbitmq:3.6 +``` diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 9c70eef..1c13c0e 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -1,12 +1,15 @@ package bootstrap import ( + "fmt" + "net/url" "os" "github.com/mono83/slf/rays" "github.com/mono83/slf/recievers/ansi" "github.com/mono83/slf/recievers/statsd" "github.com/mono83/slf/wd" + "github.com/streadway/amqp" ) func CreateLogger(statsdAddr string) (wd.Watchdog, error) { @@ -28,3 +31,34 @@ func CreateLogger(statsdAddr string) (wd.Watchdog, error) { return wd.New("", "").WithParams(rays.Host), nil } + +type RabbitMQConfig struct { + Username string + Password string + Host string + Port int + Vhost string +} + +func CreateRabbitMQChannel(config *RabbitMQConfig) (*amqp.Channel, error) { + addr := fmt.Sprintf( + "amqp://%s:%s@%s:%d/%s", + config.Username, + config.Password, + config.Host, + config.Port, + url.PathEscape(config.Vhost), + ) + + rabbitConnection, err := amqp.Dial(addr) + if err != nil { + return nil, err + } + + rabbitChannel, err := rabbitConnection.Channel() + if err != nil { + return nil, err + } + + return rabbitChannel, nil +} diff --git a/cmd/amqpWorker.go b/cmd/amqpWorker.go new file mode 100644 index 0000000..8cc1d7b --- /dev/null +++ b/cmd/amqpWorker.go @@ -0,0 +1,63 @@ +package cmd + +import ( + "fmt" + "log" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "elyby/minecraft-skinsystem/bootstrap" + "elyby/minecraft-skinsystem/db" + "elyby/minecraft-skinsystem/worker" +) + +var amqpWorkerCmd = &cobra.Command{ + Use: "amqp-worker", + Short: "Launches a worker which listens to events and processes them", + Run: func(cmd *cobra.Command, args []string) { + logger, err := bootstrap.CreateLogger(viper.GetString("statsd.addr")) + if err != nil { + log.Fatal(fmt.Printf("Cannot initialize logger: %v", err)) + } + logger.Info("Logger successfully initialized") + + storageFactory := db.StorageFactory{Config: viper.GetViper()} + + logger.Info("Initializing skins repository") + skinsRepo, err := storageFactory.CreateFactory("redis").CreateSkinsRepository() + if err != nil { + logger.Emergency(fmt.Sprintf("Error on creating skins repo: %+v", err)) + return + } + logger.Info("Skins repository successfully initialized") + + logger.Info("Initializing AMQP connection") + amqpChannel, err := bootstrap.CreateRabbitMQChannel(&bootstrap.RabbitMQConfig{ + Host: viper.GetString("amqp.host"), + Port: viper.GetInt("amqp.port"), + Username: viper.GetString("amqp.username"), + Password: viper.GetString("amqp.password"), + Vhost: viper.GetString("amqp.vhost"), + }) + if err != nil { + logger.Emergency(fmt.Sprintf("Error on connecting AMQP: %+v", err)) + return + } + logger.Info("AMQP connection successfully initialized") + + services := &worker.Services{ + Logger: logger, + Channel: amqpChannel, + SkinsRepo: skinsRepo, + } + + if err := services.Run(); err != nil { + logger.Error(fmt.Sprintf("Cannot initialize worker: %+v", err)) + } + }, +} + +func init() { + RootCmd.AddCommand(amqpWorkerCmd) +} diff --git a/db/redis.go b/db/redis.go index 72060c0..3c74675 100644 --- a/db/redis.go +++ b/db/redis.go @@ -96,42 +96,44 @@ type redisDb struct { const accountIdToUsernameKey string = "hash:username-to-account-id" -func (db *redisDb) FindByUsername(username string) (model.Skin, error) { - var record model.Skin +func (db *redisDb) FindByUsername(username string) (*model.Skin, error) { if username == "" { - return record, &SkinNotFoundError{username} + return nil, &SkinNotFoundError{username} } redisKey := buildKey(username) response := db.conn.Cmd("GET", redisKey) if response.IsType(redis.Nil) { - return record, &SkinNotFoundError{username} + return nil, &SkinNotFoundError{username} } encodedResult, err := response.Bytes() - if err == nil { - result, err := zlibDecode(encodedResult) - if err != nil { - log.Println("Cannot uncompress zlib for key " + redisKey) // TODO: replace with valid error - return record, err - } - - err = json.Unmarshal(result, &record) - if err != nil { - log.Println("Cannot decode record data for key" + redisKey) // TODO: replace with valid error - return record, nil - } - - record.OldUsername = record.Username + if err != nil { + return nil, err } - return record, nil + result, err := zlibDecode(encodedResult) + if err != nil { + log.Println("Cannot uncompress zlib for key " + redisKey) // TODO: replace with valid error + return nil, err + } + + var skin *model.Skin + err = json.Unmarshal(result, &skin) + if err != nil { + log.Println("Cannot decode record data for key" + redisKey) // TODO: replace with valid error + return nil, nil + } + + skin.OldUsername = skin.Username + + return skin, nil } -func (db *redisDb) FindByUserId(id int) (model.Skin, error) { +func (db *redisDb) FindByUserId(id int) (*model.Skin, error) { response := db.conn.Cmd("HGET", accountIdToUsernameKey, id) if response.IsType(redis.Nil) { - return model.Skin{}, SkinNotFoundError{"unknown"} + return nil, SkinNotFoundError{"unknown"} } username, _ := response.Str() @@ -139,6 +141,34 @@ func (db *redisDb) FindByUserId(id int) (model.Skin, error) { return db.FindByUsername(username) } +func (db *redisDb) Save(skin *model.Skin) error { + conn := db.conn + if poolConn, isPool := conn.(*pool.Pool); isPool { + conn, _ = poolConn.Get() + } + + conn.Cmd("MULTI") + + // Если пользователь сменил ник, то мы должны удать его ключ + if skin.OldUsername != "" && skin.OldUsername != skin.Username { + conn.Cmd("DEL", buildKey(skin.OldUsername)) + } + + // Если это новая запись или если пользователь сменил ник, то обновляем значение в хэш-таблице + if skin.OldUsername != "" || skin.OldUsername != skin.Username { + conn.Cmd("HSET", accountIdToUsernameKey, skin.UserId, skin.Username) + } + + str, _ := json.Marshal(skin) + conn.Cmd("SET", buildKey(skin.Username), zlibEncode(str)) + + conn.Cmd("EXEC") + + skin.OldUsername = skin.Username + + return nil +} + func buildKey(username string) string { return "username:" + strings.ToLower(username) } diff --git a/model/events.go b/model/events.go new file mode 100644 index 0000000..1f66000 --- /dev/null +++ b/model/events.go @@ -0,0 +1,20 @@ +package model + +type UsernameChanged struct { + AccountId int `json:"accountId"` + OldUsername string `json:"oldUsername"` + NewUsername string `json:"newUsername"` +} + +type SkinChanged struct { + AccountId int `json:"userId"` + Uuid string `json:"uuid"` + SkinId int `json:"skinId"` + OldSkinId int `json:"oldSkinId"` + Hash string `json:"hash"` + Is1_8 bool `json:"is1_8"` + IsSlim bool `json:"isSlim"` + Url string `json:"url"` + MojangTextures string `json:"mojangTextures"` + MojangSignature string `json:"mojangSignature"` +} diff --git a/repositories/skins.go b/repositories/skins.go index 9c8ccb3..c5b96fb 100644 --- a/repositories/skins.go +++ b/repositories/skins.go @@ -3,6 +3,7 @@ package repositories import "elyby/minecraft-skinsystem/model" type SkinsRepository interface { - FindByUsername(username string) (model.Skin, error) - FindByUserId(id int) (model.Skin, error) + FindByUsername(username string) (*model.Skin, error) + FindByUserId(id int) (*model.Skin, error) + Save(skin *model.Skin) error } diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..bbdfcbb --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,188 @@ +package worker + +import ( + "encoding/json" + + "github.com/mono83/slf/wd" + "github.com/streadway/amqp" + + "elyby/minecraft-skinsystem/model" + "elyby/minecraft-skinsystem/repositories" +) + +type Services struct { + Channel *amqp.Channel + SkinsRepo repositories.SkinsRepository + Logger wd.Watchdog +} + +const exchangeName string = "events" +const queueName string = "skinsystem-accounts-events" + +func (service *Services) Run() error { + deliveryChannel, err := setupConsume(service.Channel) + if err != nil { + return err + } + + forever := make(chan bool) + go func() { + for d := range deliveryChannel { + service.Logger.Debug("Incoming message with routing key " + d.RoutingKey) + var result bool = true + switch d.RoutingKey { + case "accounts.username-changed": + var event *model.UsernameChanged + json.Unmarshal(d.Body, &event) + result = service.HandleChangeUsername(event) + case "accounts.skin-changed": + var event *model.SkinChanged + json.Unmarshal(d.Body, &event) + result = service.HandleSkinChanged(event) + } + + if result { + d.Ack(false) + } else { + d.Reject(true) + } + } + }() + <-forever + + return nil +} + +func (service *Services) HandleChangeUsername(event *model.UsernameChanged) bool { + if event.OldUsername == "" { + service.Logger.IncCounter("worker.change_username.empty_old_username", 1) + record := &model.Skin{ + UserId: event.AccountId, + Username: event.NewUsername, + } + + service.SkinsRepo.Save(record) + + return true + } + + record, err := service.SkinsRepo.FindByUserId(event.AccountId) + if err != nil { + /* + // TODO: вернуть логику восстановления информации об аккаунте + service.Logger.IncCounter("worker.change_username.id_not_found", 1) + service.Logger.Warning("Cannot find user id. Trying to search.") + response, err := getById(event.AccountId) + if err != nil { + service.Logger.IncCounter("worker.change_username.id_not_restored", 1) + service.Logger.Error("Cannot restore user info. %T\n", err) + // TODO: логгировать в какой-нибудь Sentry, если там не 404 + return true + } + + service.Logger.IncCounter("worker.change_username.id_restored", 1) + fmt.Println("User info successfully restored.") + record = &event.Skin{ + UserId: response.Id, + } + */ + } + + record.Username = event.NewUsername + service.SkinsRepo.Save(record) + + service.Logger.IncCounter("worker.change_username.processed", 1) + + return true +} + +func (service *Services) HandleSkinChanged(event *model.SkinChanged) bool { + record, err := service.SkinsRepo.FindByUserId(event.AccountId) + if err != nil { + service.Logger.IncCounter("worker.skin_changed.id_not_found", 1) + service.Logger.Warning("Cannot find user id. Trying to search.") + /* + // TODO: вернуть логику восстановления информации об аккаунте + response, err := getById(event.AccountId) + if err != nil { + services.Logger.IncCounter("worker.skin_changed.id_not_restored", 1) + fmt.Printf("Cannot restore user info. %T\n", err) + // TODO: логгировать в какой-нибудь Sentry, если там не 404 + return true + } + + services.Logger.IncCounter("worker.skin_changed.id_restored", 1) + fmt.Println("User info successfully restored.") + record.UserId = response.Id + record.Username = response.Username + */ + } + + record.Uuid = event.Uuid + record.SkinId = event.SkinId + record.Hash = event.Hash + record.Is1_8 = event.Is1_8 + record.IsSlim = event.IsSlim + record.Url = event.Url + record.MojangTextures = event.MojangTextures + record.MojangSignature = event.MojangSignature + + service.SkinsRepo.Save(record) + + service.Logger.IncCounter("worker.skin_changed.processed", 1) + + return true +} + +func setupConsume(channel *amqp.Channel) (<-chan amqp.Delivery, error) { + var err error + err = channel.ExchangeDeclare( + exchangeName, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, err + } + + _, err = channel.QueueDeclare( + queueName, // name + true, // durable + false, // delete when usused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, err + } + + err = channel.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil) + if err != nil { + return nil, err + } + + err = channel.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil) + if err != nil { + return nil, err + } + + deliveryChannel, err := channel.Consume( + queueName, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, err + } + + return deliveryChannel, nil +}