mirror of
				https://github.com/elyby/chrly.git
				synced 2025-05-31 14:11:51 +05:30 
			
		
		
		
	Merge branch '4.5.0'
This commit is contained in:
		
							
								
								
									
										10
									
								
								CHANGELOG.md
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								CHANGELOG.md
									
									
									
									
									
								
							| @@ -5,6 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), | ||||
| and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). | ||||
|  | ||||
| ## [Unreleased] - xxxx-xx-xx | ||||
| ### Added | ||||
| - [#24](https://github.com/elyby/chrly/issues/24): Implemented a new strategy for the queue in the batch provider of | ||||
|   Mojang UUIDs: `full-bus`. | ||||
| - New configuration param `QUEUE_STRATEGY` with the default value `periodic`. | ||||
| - New configuration params: `MOJANG_API_BASE_URL` and `MOJANG_SESSION_SERVER_BASE_URL`, that allow you to spoof | ||||
|   Mojang API base addresses. | ||||
|  | ||||
| ### Changed | ||||
| - `ely.skinsystem.{hostname}.app.mojang_textures.usernames.round_time` timer will not be recorded if the iteration was | ||||
|   empty. | ||||
|  | ||||
| ## [4.4.1] - 2020-04-24 | ||||
| ### Added | ||||
|   | ||||
							
								
								
									
										22
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								README.md
									
									
									
									
									
								
							| @@ -97,6 +97,14 @@ docker-compose up -d app | ||||
|         <td>Sentry can be used to collect app errors</td> | ||||
|         <td><code>https://public:private@your.sentry.io/1</code></td> | ||||
|     </tr> | ||||
|     <tr> | ||||
|         <td>QUEUE_STRATEGY</td> | ||||
|         <td> | ||||
|             Sets the strategy for the queue in the batch provider of Mojang UUIDs. Allowed values are <code>periodic</code> | ||||
|             and <code>full-bus</code> (see <a href="https://github.com/elyby/chrly/issues/24">#24</a>). | ||||
|         </td> | ||||
|         <td><code>periodic</code></td> | ||||
|     </tr> | ||||
|     <tr> | ||||
|         <td>QUEUE_LOOP_DELAY</td> | ||||
|         <td> | ||||
| @@ -137,6 +145,20 @@ docker-compose up -d app | ||||
|         </td> | ||||
|         <td><code>http://remote-provider.com/api/worker/mojang-uuid</code></td> | ||||
|     </tr> | ||||
|     <tr> | ||||
|         <td>MOJANG_API_BASE_URL</td> | ||||
|         <td> | ||||
|             Allows you to spoof the Mojang's API server address. | ||||
|         </td> | ||||
|         <td><code>https://api.mojang.com</code></td> | ||||
|     </tr> | ||||
|     <tr> | ||||
|         <td>MOJANG_SESSION_SERVER_BASE_URL</td> | ||||
|         <td> | ||||
|             Allows you to spoof the Mojang's Session server address. | ||||
|         </td> | ||||
|         <td><code>https://sessionserver.mojang.com</code></td> | ||||
|     </tr> | ||||
|     <tr> | ||||
|         <td>TEXTURES_EXTRA_PARAM_NAME</td> | ||||
|         <td> | ||||
|   | ||||
| @@ -58,11 +58,17 @@ type ProfileInfo struct { | ||||
| 	IsDemo   bool   `json:"demo,omitempty"` | ||||
| } | ||||
|  | ||||
| var ApiMojangDotComAddr = "https://api.mojang.com" | ||||
| var SessionServerMojangComAddr = "https://sessionserver.mojang.com" | ||||
|  | ||||
| // Exchanges usernames array to array of uuids | ||||
| // See https://wiki.vg/Mojang_API#Playernames_-.3E_UUIDs | ||||
| func UsernamesToUuids(usernames []string) ([]*ProfileInfo, error) { | ||||
| 	requestBody, _ := json.Marshal(usernames) | ||||
| 	request, _ := http.NewRequest("POST", "https://api.mojang.com/profiles/minecraft", bytes.NewBuffer(requestBody)) | ||||
| 	request, err := http.NewRequest("POST", ApiMojangDotComAddr+"/profiles/minecraft", bytes.NewBuffer(requestBody)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	request.Header.Set("Content-Type", "application/json") | ||||
|  | ||||
| @@ -88,12 +94,15 @@ func UsernamesToUuids(usernames []string) ([]*ProfileInfo, error) { | ||||
| // See https://wiki.vg/Mojang_API#UUID_-.3E_Profile_.2B_Skin.2FCape | ||||
| func UuidToTextures(uuid string, signed bool) (*SignedTexturesResponse, error) { | ||||
| 	normalizedUuid := strings.ReplaceAll(uuid, "-", "") | ||||
| 	url := "https://sessionserver.mojang.com/session/minecraft/profile/" + normalizedUuid | ||||
| 	url := SessionServerMojangComAddr + "/session/minecraft/profile/" + normalizedUuid | ||||
| 	if signed { | ||||
| 		url += "?unsigned=false" | ||||
| 	} | ||||
|  | ||||
| 	request, _ := http.NewRequest("GET", url, nil) | ||||
| 	request, err := http.NewRequest("GET", url, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	response, err := HttpClient.Do(request) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| package di | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net/url" | ||||
| 	"time" | ||||
| @@ -8,21 +9,50 @@ import ( | ||||
| 	"github.com/goava/di" | ||||
| 	"github.com/spf13/viper" | ||||
|  | ||||
| 	"github.com/elyby/chrly/api/mojang" | ||||
| 	es "github.com/elyby/chrly/eventsubscribers" | ||||
| 	"github.com/elyby/chrly/http" | ||||
| 	"github.com/elyby/chrly/mojangtextures" | ||||
| ) | ||||
|  | ||||
| var mojangTextures = di.Options( | ||||
| 	di.Invoke(interceptMojangApiUrls), | ||||
| 	di.Provide(newMojangTexturesProviderFactory), | ||||
| 	di.Provide(newMojangTexturesProvider), | ||||
| 	di.Provide(newMojangTexturesUuidsProviderFactory), | ||||
| 	di.Provide(newMojangTexturesBatchUUIDsProvider), | ||||
| 	di.Provide(newMojangTexturesBatchUUIDsProviderStrategyFactory), | ||||
| 	di.Provide(newMojangTexturesBatchUUIDsProviderDelayedStrategy), | ||||
| 	di.Provide(newMojangTexturesBatchUUIDsProviderFullBusStrategy), | ||||
| 	di.Provide(newMojangTexturesRemoteUUIDsProvider), | ||||
| 	di.Provide(newMojangSignedTexturesProvider), | ||||
| 	di.Provide(newMojangTexturesStorageFactory), | ||||
| ) | ||||
|  | ||||
| func interceptMojangApiUrls(config *viper.Viper) error { | ||||
| 	apiUrl := config.GetString("mojang.api_base_url") | ||||
| 	if apiUrl != "" { | ||||
| 		u, err := url.ParseRequestURI(apiUrl) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		mojang.ApiMojangDotComAddr = u.String() | ||||
| 	} | ||||
|  | ||||
| 	sessionServerUrl := config.GetString("mojang.session_server_base_url") | ||||
| 	if sessionServerUrl != "" { | ||||
| 		u, err := url.ParseRequestURI(apiUrl) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		mojang.SessionServerMojangComAddr = u.String() | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func newMojangTexturesProviderFactory( | ||||
| 	container *di.Container, | ||||
| 	config *viper.Viper, | ||||
| @@ -75,7 +105,7 @@ func newMojangTexturesUuidsProviderFactory( | ||||
|  | ||||
| func newMojangTexturesBatchUUIDsProvider( | ||||
| 	container *di.Container, | ||||
| 	config *viper.Viper, | ||||
| 	strategy mojangtextures.BatchUuidsProviderStrategy, | ||||
| 	emitter mojangtextures.Emitter, | ||||
| ) (*mojangtextures.BatchUuidsProvider, error) { | ||||
| 	if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker { | ||||
| @@ -106,14 +136,56 @@ func newMojangTexturesBatchUUIDsProvider( | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return mojangtextures.NewBatchUuidsProvider(context.Background(), strategy, emitter), nil | ||||
| } | ||||
|  | ||||
| func newMojangTexturesBatchUUIDsProviderStrategyFactory( | ||||
| 	container *di.Container, | ||||
| 	config *viper.Viper, | ||||
| ) (mojangtextures.BatchUuidsProviderStrategy, error) { | ||||
| 	config.SetDefault("queue.strategy", "periodic") | ||||
|  | ||||
| 	strategyName := config.GetString("queue.strategy") | ||||
| 	switch strategyName { | ||||
| 	case "periodic": | ||||
| 		var strategy *mojangtextures.PeriodicStrategy | ||||
| 		err := container.Resolve(&strategy) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		return strategy, nil | ||||
| 	case "full-bus": | ||||
| 		var strategy *mojangtextures.FullBusStrategy | ||||
| 		err := container.Resolve(&strategy) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		return strategy, nil | ||||
| 	default: | ||||
| 		return nil, fmt.Errorf("unknown queue strategy \"%s\"", strategyName) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newMojangTexturesBatchUUIDsProviderDelayedStrategy(config *viper.Viper) *mojangtextures.PeriodicStrategy { | ||||
| 	config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) | ||||
| 	config.SetDefault("queue.batch_size", 10) | ||||
|  | ||||
| 	return &mojangtextures.BatchUuidsProvider{ | ||||
| 		Emitter:        emitter, | ||||
| 		IterationDelay: config.GetDuration("queue.loop_delay"), | ||||
| 		IterationSize:  config.GetInt("queue.batch_size"), | ||||
| 	}, nil | ||||
| 	return mojangtextures.NewPeriodicStrategy( | ||||
| 		config.GetDuration("queue.loop_delay"), | ||||
| 		config.GetInt("queue.batch_size"), | ||||
| 	) | ||||
| } | ||||
|  | ||||
| func newMojangTexturesBatchUUIDsProviderFullBusStrategy(config *viper.Viper) *mojangtextures.FullBusStrategy { | ||||
| 	config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) | ||||
| 	config.SetDefault("queue.batch_size", 10) | ||||
|  | ||||
| 	return mojangtextures.NewFullBusStrategy( | ||||
| 		config.GetDuration("queue.loop_delay"), | ||||
| 		config.GetInt("queue.batch_size"), | ||||
| 	) | ||||
| } | ||||
|  | ||||
| func newMojangTexturesRemoteUUIDsProvider( | ||||
|   | ||||
| @@ -96,12 +96,12 @@ func (s *StatsReporter) ConfigureWithDispatcher(d Subscriber) { | ||||
| 	d.Subscribe("mojang_textures:batch_uuids_provider:round", func(usernames []string, queueSize int) { | ||||
| 		s.UpdateGauge("mojang_textures.usernames.iteration_size", int64(len(usernames))) | ||||
| 		s.UpdateGauge("mojang_textures.usernames.queue_size", int64(queueSize)) | ||||
| 		if len(usernames) != 0 { | ||||
| 			s.startTimeRecording("batch_uuids_provider_round_time_" + strings.Join(usernames, "|")) | ||||
| 		} | ||||
| 	}) | ||||
| 	d.Subscribe("mojang_textures:batch_uuids_provider:before_round", func() { | ||||
| 		s.startTimeRecording("batch_uuids_provider_round_time") | ||||
| 	}) | ||||
| 	d.Subscribe("mojang_textures:batch_uuids_provider:after_round", func() { | ||||
| 		s.finalizeTimeRecording("batch_uuids_provider_round_time", "mojang_textures.usernames.round_time") | ||||
| 	d.Subscribe("mojang_textures:batch_uuids_provider:result", func(usernames []string, profiles []*mojang.ProfileInfo, err error) { | ||||
| 		s.finalizeTimeRecording("batch_uuids_provider_round_time_"+strings.Join(usernames, "|"), "mojang_textures.usernames.round_time") | ||||
| 	}) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -337,19 +337,24 @@ var statsReporterTestCases = []*StatsReporterTestCase{ | ||||
| 	{ | ||||
| 		Events: [][]interface{}{ | ||||
| 			{"mojang_textures:batch_uuids_provider:round", []string{"username1", "username2"}, 5}, | ||||
| 			{"mojang_textures:batch_uuids_provider:result", []string{"username1", "username2"}, []*mojang.ProfileInfo{}, nil}, | ||||
| 		}, | ||||
| 		ExpectedCalls: [][]interface{}{ | ||||
| 			{"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(2)}, | ||||
| 			{"UpdateGauge", "mojang_textures.usernames.queue_size", int64(5)}, | ||||
| 			{"RecordTimer", "mojang_textures.usernames.round_time", mock.AnythingOfType("time.Duration")}, | ||||
| 		}, | ||||
| 	}, | ||||
| 	{ | ||||
| 		Events: [][]interface{}{ | ||||
| 			{"mojang_textures:batch_uuids_provider:before_round"}, | ||||
| 			{"mojang_textures:batch_uuids_provider:after_round"}, | ||||
| 			{"mojang_textures:batch_uuids_provider:round", []string{}, 0}, | ||||
| 			// This event will be not emitted, but we emit it to ensure, that RecordTimer will not be called | ||||
| 			{"mojang_textures:batch_uuids_provider:result", []string{}, []*mojang.ProfileInfo{}, nil}, | ||||
| 		}, | ||||
| 		ExpectedCalls: [][]interface{}{ | ||||
| 			{"RecordTimer", "mojang_textures.usernames.round_time", mock.AnythingOfType("time.Duration")}, | ||||
| 			{"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(0)}, | ||||
| 			{"UpdateGauge", "mojang_textures.usernames.queue_size", int64(0)}, | ||||
| 			// Should not call RecordTimer | ||||
| 		}, | ||||
| 	}, | ||||
| } | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| package mojangtextures | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| @@ -9,131 +10,234 @@ import ( | ||||
| ) | ||||
|  | ||||
| type jobResult struct { | ||||
| 	profile *mojang.ProfileInfo | ||||
| 	error   error | ||||
| 	Profile *mojang.ProfileInfo | ||||
| 	Error   error | ||||
| } | ||||
|  | ||||
| type jobItem struct { | ||||
| 	username    string | ||||
| 	respondChan chan *jobResult | ||||
| type job struct { | ||||
| 	Username    string | ||||
| 	RespondChan chan *jobResult | ||||
| } | ||||
|  | ||||
| type jobsQueue struct { | ||||
| 	lock  sync.Mutex | ||||
| 	items []*jobItem | ||||
| 	items []*job | ||||
| } | ||||
|  | ||||
| func (s *jobsQueue) New() *jobsQueue { | ||||
| 	s.items = []*jobItem{} | ||||
| 	return s | ||||
| } | ||||
|  | ||||
| func (s *jobsQueue) Enqueue(t *jobItem) { | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
|  | ||||
| 	s.items = append(s.items, t) | ||||
| } | ||||
|  | ||||
| func (s *jobsQueue) Dequeue(n int) []*jobItem { | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
|  | ||||
| 	if n > s.size() { | ||||
| 		n = s.size() | ||||
| func newJobsQueue() *jobsQueue { | ||||
| 	return &jobsQueue{ | ||||
| 		items: []*job{}, | ||||
| 	} | ||||
|  | ||||
| 	items := s.items[0:n] | ||||
| 	s.items = s.items[n:len(s.items)] | ||||
|  | ||||
| 	return items | ||||
| } | ||||
|  | ||||
| func (s *jobsQueue) Size() int { | ||||
| func (s *jobsQueue) Enqueue(job *job) int { | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
|  | ||||
| 	return s.size() | ||||
| } | ||||
| 	s.items = append(s.items, job) | ||||
|  | ||||
| func (s *jobsQueue) size() int { | ||||
| 	return len(s.items) | ||||
| } | ||||
|  | ||||
| func (s *jobsQueue) Dequeue(n int) ([]*job, int) { | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
|  | ||||
| 	l := len(s.items) | ||||
| 	if n > l { | ||||
| 		n = l | ||||
| 	} | ||||
|  | ||||
| 	items := s.items[0:n] | ||||
| 	s.items = s.items[n:l] | ||||
|  | ||||
| 	return items, l - n | ||||
| } | ||||
|  | ||||
| var usernamesToUuids = mojang.UsernamesToUuids | ||||
| var forever = func() bool { | ||||
| 	return true | ||||
|  | ||||
| type JobsIteration struct { | ||||
| 	Jobs  []*job | ||||
| 	Queue int | ||||
| 	c     chan struct{} | ||||
| } | ||||
|  | ||||
| func (j *JobsIteration) Done() { | ||||
| 	if j.c != nil { | ||||
| 		close(j.c) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type BatchUuidsProviderStrategy interface { | ||||
| 	Queue(job *job) | ||||
| 	GetJobs(abort context.Context) <-chan *JobsIteration | ||||
| } | ||||
|  | ||||
| type PeriodicStrategy struct { | ||||
| 	Delay time.Duration | ||||
| 	Batch int | ||||
| 	queue *jobsQueue | ||||
| 	done  chan struct{} | ||||
| } | ||||
|  | ||||
| func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy { | ||||
| 	return &PeriodicStrategy{ | ||||
| 		Delay: delay, | ||||
| 		Batch: batch, | ||||
| 		queue: newJobsQueue(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ctx *PeriodicStrategy) Queue(job *job) { | ||||
| 	ctx.queue.Enqueue(job) | ||||
| } | ||||
|  | ||||
| func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { | ||||
| 	ch := make(chan *JobsIteration) | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-abort.Done(): | ||||
| 				close(ch) | ||||
| 				return | ||||
| 			case <-time.After(ctx.Delay): | ||||
| 				jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) | ||||
| 				jobDoneChan := make(chan struct{}) | ||||
| 				ch <- &JobsIteration{jobs, queueLen, jobDoneChan} | ||||
| 				<-jobDoneChan | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return ch | ||||
| } | ||||
|  | ||||
| type FullBusStrategy struct { | ||||
| 	Delay     time.Duration | ||||
| 	Batch     int | ||||
| 	queue     *jobsQueue | ||||
| 	busIsFull chan bool | ||||
| } | ||||
|  | ||||
| func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy { | ||||
| 	return &FullBusStrategy{ | ||||
| 		Delay:     delay, | ||||
| 		Batch:     batch, | ||||
| 		queue:     newJobsQueue(), | ||||
| 		busIsFull: make(chan bool), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ctx *FullBusStrategy) Queue(job *job) { | ||||
| 	n := ctx.queue.Enqueue(job) | ||||
| 	if n % ctx.Batch == 0 { | ||||
| 		ctx.busIsFull <- true | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Формально, это описание логики водителя маршрутки xD | ||||
| func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { | ||||
| 	ch := make(chan *JobsIteration) | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			t := time.NewTimer(ctx.Delay) | ||||
| 			select { | ||||
| 			case <-abort.Done(): | ||||
| 				close(ch) | ||||
| 				return | ||||
| 			case <-t.C: | ||||
| 				ctx.sendJobs(ch) | ||||
| 			case <-ctx.busIsFull: | ||||
| 				t.Stop() | ||||
| 				ctx.sendJobs(ch) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return ch | ||||
| } | ||||
|  | ||||
| func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) { | ||||
| 	jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) | ||||
| 	ch <- &JobsIteration{jobs, queueLen, nil} | ||||
| } | ||||
|  | ||||
| type BatchUuidsProvider struct { | ||||
| 	Emitter | ||||
|  | ||||
| 	IterationDelay time.Duration | ||||
| 	IterationSize  int | ||||
|  | ||||
| 	context     context.Context | ||||
| 	emitter     Emitter | ||||
| 	strategy    BatchUuidsProviderStrategy | ||||
| 	onFirstCall sync.Once | ||||
| 	queue       jobsQueue | ||||
| } | ||||
|  | ||||
| func NewBatchUuidsProvider( | ||||
| 	context context.Context, | ||||
| 	strategy BatchUuidsProviderStrategy, | ||||
| 	emitter Emitter, | ||||
| ) *BatchUuidsProvider { | ||||
| 	return &BatchUuidsProvider{ | ||||
| 		context:  context, | ||||
| 		emitter:  emitter, | ||||
| 		strategy: strategy, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) { | ||||
| 	ctx.onFirstCall.Do(func() { | ||||
| 		ctx.queue.New() | ||||
| 		ctx.startQueue() | ||||
| 	}) | ||||
| 	ctx.onFirstCall.Do(ctx.startQueue) | ||||
|  | ||||
| 	resultChan := make(chan *jobResult) | ||||
| 	ctx.queue.Enqueue(&jobItem{username, resultChan}) | ||||
| 	ctx.Emit("mojang_textures:batch_uuids_provider:queued", username) | ||||
| 	ctx.strategy.Queue(&job{username, resultChan}) | ||||
| 	ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username) | ||||
|  | ||||
| 	result := <-resultChan | ||||
|  | ||||
| 	return result.profile, result.error | ||||
| 	return result.Profile, result.Error | ||||
| } | ||||
|  | ||||
| func (ctx *BatchUuidsProvider) startQueue() { | ||||
| 	go func() { | ||||
| 		time.Sleep(ctx.IterationDelay) | ||||
| 		for forever() { | ||||
| 			ctx.Emit("mojang_textures:batch_uuids_provider:before_round") | ||||
| 			ctx.queueRound() | ||||
| 			ctx.Emit("mojang_textures:batch_uuids_provider:after_round") | ||||
| 			time.Sleep(ctx.IterationDelay) | ||||
| 		jobsChan := ctx.strategy.GetJobs(ctx.context) | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-ctx.context.Done(): | ||||
| 				return | ||||
| 			case iteration := <-jobsChan: | ||||
| 				go func() { | ||||
| 					ctx.performRequest(iteration) | ||||
| 					iteration.Done() | ||||
| 				}() | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (ctx *BatchUuidsProvider) queueRound() { | ||||
| 	queueSize := ctx.queue.Size() | ||||
| 	jobs := ctx.queue.Dequeue(ctx.IterationSize) | ||||
|  | ||||
| 	var usernames []string | ||||
| 	for _, job := range jobs { | ||||
| 		usernames = append(usernames, job.username) | ||||
| func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) { | ||||
| 	usernames := make([]string, len(iteration.Jobs)) | ||||
| 	for i, job := range iteration.Jobs { | ||||
| 		usernames[i] = job.Username | ||||
| 	} | ||||
|  | ||||
| 	ctx.Emit("mojang_textures:batch_uuids_provider:round", usernames, queueSize-len(jobs)) | ||||
| 	ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue) | ||||
| 	if len(usernames) == 0 { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	profiles, err := usernamesToUuids(usernames) | ||||
| 	ctx.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) | ||||
| 	for _, job := range jobs { | ||||
| 		go func(job *jobItem) { | ||||
| 	ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) | ||||
| 	for _, job := range iteration.Jobs { | ||||
| 		response := &jobResult{} | ||||
| 			if err != nil { | ||||
| 				response.error = err | ||||
| 			} else { | ||||
| 		if err == nil { | ||||
| 			// The profiles in the response aren't ordered, so we must search each username over full array | ||||
| 			for _, profile := range profiles { | ||||
| 					if strings.EqualFold(job.username, profile.Name) { | ||||
| 						response.profile = profile | ||||
| 				if strings.EqualFold(job.Username, profile.Name) { | ||||
| 					response.Profile = profile | ||||
| 					break | ||||
| 				} | ||||
| 			} | ||||
| 		} else { | ||||
| 			response.Error = err | ||||
| 		} | ||||
|  | ||||
| 			job.respondChan <- response | ||||
| 		}(job) | ||||
| 		job.RespondChan <- response | ||||
| 		close(job.RespondChan) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -1,64 +1,51 @@ | ||||
| package mojangtextures | ||||
|  | ||||
| import ( | ||||
| 	"crypto/rand" | ||||
| 	"encoding/base64" | ||||
| 	"strings" | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	testify "github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/mock" | ||||
| 	"github.com/stretchr/testify/require" | ||||
| 	"github.com/stretchr/testify/suite" | ||||
|  | ||||
| 	"github.com/elyby/chrly/api/mojang" | ||||
| ) | ||||
|  | ||||
| func TestJobsQueue(t *testing.T) { | ||||
| 	createQueue := func() *jobsQueue { | ||||
| 		queue := &jobsQueue{} | ||||
| 		queue.New() | ||||
|  | ||||
| 		return queue | ||||
| 	} | ||||
|  | ||||
| 	t.Run("Enqueue", func(t *testing.T) { | ||||
| 		assert := testify.New(t) | ||||
|  | ||||
| 		s := createQueue() | ||||
| 		s.Enqueue(&jobItem{username: "username1"}) | ||||
| 		s.Enqueue(&jobItem{username: "username2"}) | ||||
| 		s.Enqueue(&jobItem{username: "username3"}) | ||||
|  | ||||
| 		assert.Equal(3, s.Size()) | ||||
| 		s := newJobsQueue() | ||||
| 		require.Equal(t, 1, s.Enqueue(&job{Username: "username1"})) | ||||
| 		require.Equal(t, 2, s.Enqueue(&job{Username: "username2"})) | ||||
| 		require.Equal(t, 3, s.Enqueue(&job{Username: "username3"})) | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("Dequeue", func(t *testing.T) { | ||||
| 		assert := testify.New(t) | ||||
| 		s := newJobsQueue() | ||||
| 		s.Enqueue(&job{Username: "username1"}) | ||||
| 		s.Enqueue(&job{Username: "username2"}) | ||||
| 		s.Enqueue(&job{Username: "username3"}) | ||||
| 		s.Enqueue(&job{Username: "username4"}) | ||||
| 		s.Enqueue(&job{Username: "username5"}) | ||||
|  | ||||
| 		s := createQueue() | ||||
| 		s.Enqueue(&jobItem{username: "username1"}) | ||||
| 		s.Enqueue(&jobItem{username: "username2"}) | ||||
| 		s.Enqueue(&jobItem{username: "username3"}) | ||||
| 		s.Enqueue(&jobItem{username: "username4"}) | ||||
| 		items, queueLen := s.Dequeue(2) | ||||
| 		require.Len(t, items, 2) | ||||
| 		require.Equal(t, 3, queueLen) | ||||
| 		require.Equal(t, "username1", items[0].Username) | ||||
| 		require.Equal(t, "username2", items[1].Username) | ||||
|  | ||||
| 		items := s.Dequeue(2) | ||||
| 		assert.Len(items, 2) | ||||
| 		assert.Equal("username1", items[0].username) | ||||
| 		assert.Equal("username2", items[1].username) | ||||
| 		assert.Equal(2, s.Size()) | ||||
|  | ||||
| 		items = s.Dequeue(40) | ||||
| 		assert.Len(items, 2) | ||||
| 		assert.Equal("username3", items[0].username) | ||||
| 		assert.Equal("username4", items[1].username) | ||||
| 		items, queueLen = s.Dequeue(40) | ||||
| 		require.Len(t, items, 3) | ||||
| 		require.Equal(t, 0, queueLen) | ||||
| 		require.Equal(t, "username3", items[0].Username) | ||||
| 		require.Equal(t, "username4", items[1].Username) | ||||
| 		require.Equal(t, "username5", items[2].Username) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // This is really stupid test just to get 100% coverage on this package :) | ||||
| func TestBatchUuidsProvider_forever(t *testing.T) { | ||||
| 	testify.True(t, forever()) | ||||
| } | ||||
|  | ||||
| type mojangUsernamesToUuidsRequestMock struct { | ||||
| 	mock.Mock | ||||
| } | ||||
| @@ -73,6 +60,37 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string) | ||||
| 	return result, args.Error(1) | ||||
| } | ||||
|  | ||||
| type manualStrategy struct { | ||||
| 	ch   chan *JobsIteration | ||||
| 	once sync.Once | ||||
| 	lock sync.Mutex | ||||
| 	jobs []*job | ||||
| } | ||||
|  | ||||
| func (m *manualStrategy) Queue(job *job) { | ||||
| 	m.lock.Lock() | ||||
| 	m.jobs = append(m.jobs, job) | ||||
| 	m.lock.Unlock() | ||||
| } | ||||
|  | ||||
| func (m *manualStrategy) GetJobs(_ context.Context) <-chan *JobsIteration { | ||||
| 	m.lock.Lock() | ||||
| 	defer m.lock.Unlock() | ||||
| 	m.ch = make(chan *JobsIteration) | ||||
|  | ||||
| 	return m.ch | ||||
| } | ||||
|  | ||||
| func (m *manualStrategy) Iterate(countJobsToReturn int, countLeftJobsInQueue int) { | ||||
| 	m.lock.Lock() | ||||
| 	defer m.lock.Unlock() | ||||
|  | ||||
| 	m.ch <- &JobsIteration{ | ||||
| 		Jobs:  m.jobs[0:countJobsToReturn], | ||||
| 		Queue: countLeftJobsInQueue, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type batchUuidsProviderGetUuidResult struct { | ||||
| 	Result *mojang.ProfileInfo | ||||
| 	Error  error | ||||
| @@ -82,48 +100,35 @@ type batchUuidsProviderTestSuite struct { | ||||
| 	suite.Suite | ||||
|  | ||||
| 	Provider *BatchUuidsProvider | ||||
| 	GetUuidAsync func(username string) chan *batchUuidsProviderGetUuidResult | ||||
|  | ||||
| 	Emitter   *mockEmitter | ||||
| 	Strategy  *manualStrategy | ||||
| 	MojangApi *mojangUsernamesToUuidsRequestMock | ||||
|  | ||||
| 	Iterate     func() | ||||
| 	done        func() | ||||
| 	iterateChan chan bool | ||||
| 	GetUuidAsync func(username string) <-chan *batchUuidsProviderGetUuidResult | ||||
| 	stop         context.CancelFunc | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) SetupTest() { | ||||
| 	suite.Emitter = &mockEmitter{} | ||||
| 	suite.Strategy = &manualStrategy{} | ||||
| 	ctx, stop := context.WithCancel(context.Background()) | ||||
| 	suite.stop = stop | ||||
| 	suite.MojangApi = &mojangUsernamesToUuidsRequestMock{} | ||||
| 	usernamesToUuids = suite.MojangApi.UsernamesToUuids | ||||
|  | ||||
| 	suite.Provider = &BatchUuidsProvider{ | ||||
| 		Emitter:        suite.Emitter, | ||||
| 		IterationDelay: 0, | ||||
| 		IterationSize:  10, | ||||
| 	} | ||||
| 	suite.Provider = NewBatchUuidsProvider(ctx, suite.Strategy, suite.Emitter) | ||||
|  | ||||
| 	suite.iterateChan = make(chan bool) | ||||
| 	forever = func() bool { | ||||
| 		return <-suite.iterateChan | ||||
| 	} | ||||
|  | ||||
| 	suite.Iterate = func() { | ||||
| 		suite.iterateChan <- true | ||||
| 	} | ||||
|  | ||||
| 	suite.done = func() { | ||||
| 		suite.iterateChan <- false | ||||
| 	} | ||||
|  | ||||
| 	suite.GetUuidAsync = func(username string) chan *batchUuidsProviderGetUuidResult { | ||||
| 		s := make(chan bool) | ||||
| 	suite.GetUuidAsync = func(username string) <-chan *batchUuidsProviderGetUuidResult { | ||||
| 		s := make(chan struct{}) | ||||
| 		// This dirty hack ensures, that the username will be queued before we return control to the caller. | ||||
| 		// It's needed to keep expected calls order and prevent cases when iteration happens before all usernames | ||||
| 		// will be queued. | ||||
| 		// It's needed to keep expected calls order and prevent cases when iteration happens before | ||||
| 		// all usernames will be queued. | ||||
| 		suite.Emitter.On("Emit", | ||||
| 			"mojang_textures:batch_uuids_provider:queued", | ||||
| 			username, | ||||
| 		).Once().Run(func(args mock.Arguments) { | ||||
| 			s <- true | ||||
| 			close(s) | ||||
| 		}) | ||||
|  | ||||
| 		c := make(chan *batchUuidsProviderGetUuidResult) | ||||
| @@ -139,13 +144,10 @@ func (suite *batchUuidsProviderTestSuite) SetupTest() { | ||||
|  | ||||
| 		return c | ||||
| 	} | ||||
|  | ||||
| 	suite.MojangApi = &mojangUsernamesToUuidsRequestMock{} | ||||
| 	usernamesToUuids = suite.MojangApi.UsernamesToUuids | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) TearDownTest() { | ||||
| 	suite.done() | ||||
| 	suite.stop() | ||||
| 	suite.Emitter.AssertExpectations(suite.T()) | ||||
| 	suite.MojangApi.AssertExpectations(suite.T()) | ||||
| } | ||||
| @@ -154,37 +156,14 @@ func TestBatchUuidsProvider(t *testing.T) { | ||||
| 	suite.Run(t, new(batchUuidsProviderTestSuite)) | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) TestGetUuidForOneUsername() { | ||||
| 	expectedUsernames := []string{"username"} | ||||
| 	expectedResult := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username"} | ||||
| 	expectedResponse := []*mojang.ProfileInfo{expectedResult} | ||||
|  | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() | ||||
|  | ||||
| 	suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{expectedResult}, nil) | ||||
|  | ||||
| 	resultChan := suite.GetUuidAsync("username") | ||||
|  | ||||
| 	suite.Iterate() | ||||
|  | ||||
| 	result := <-resultChan | ||||
| 	suite.Assert().Equal(expectedResult, result.Result) | ||||
| 	suite.Assert().Nil(result.Error) | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { | ||||
| func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernames() { | ||||
| 	expectedUsernames := []string{"username1", "username2"} | ||||
| 	expectedResult1 := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username1"} | ||||
| 	expectedResult2 := &mojang.ProfileInfo{Id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", Name: "username2"} | ||||
| 	expectedResponse := []*mojang.ProfileInfo{expectedResult1, expectedResult2} | ||||
|  | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() | ||||
|  | ||||
| 	suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{ | ||||
| 		expectedResult1, | ||||
| @@ -194,7 +173,7 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { | ||||
| 	resultChan1 := suite.GetUuidAsync("username1") | ||||
| 	resultChan2 := suite.GetUuidAsync("username2") | ||||
|  | ||||
| 	suite.Iterate() | ||||
| 	suite.Strategy.Iterate(2, 0) | ||||
|  | ||||
| 	result1 := <-resultChan1 | ||||
| 	suite.Assert().Equal(expectedResult1, result1.Result) | ||||
| @@ -205,78 +184,40 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { | ||||
| 	suite.Assert().Nil(result2.Error) | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) TestGetUuidForMoreThan10Usernames() { | ||||
| 	usernames := make([]string, 12) | ||||
| 	for i := 0; i < cap(usernames); i++ { | ||||
| 		usernames[i] = randStr(8) | ||||
| 	} | ||||
| func (suite *batchUuidsProviderTestSuite) TestShouldNotSendRequestWhenNoJobsAreReturned() { | ||||
| 	//noinspection GoPreferNilSlice | ||||
| 	emptyUsernames := []string{} | ||||
| 	done := make(chan struct{}) | ||||
| 	suite.Emitter.On("Emit", | ||||
| 		"mojang_textures:batch_uuids_provider:round", | ||||
| 		emptyUsernames, | ||||
| 		1, | ||||
| 	).Once().Run(func(args mock.Arguments) { | ||||
| 		close(done) | ||||
| 	}) | ||||
|  | ||||
| 	// In this test we're not testing response, so always return an empty resultset | ||||
| 	expectedResponse := []*mojang.ProfileInfo{} | ||||
| 	_ = suite.GetUuidAsync("username") // Schedule one username to run the queue | ||||
|  | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Twice() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[0:10], 2).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[0:10], expectedResponse, nil).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[10:12], 0).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[10:12], expectedResponse, nil).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Twice() | ||||
|  | ||||
| 	suite.MojangApi.On("UsernamesToUuids", usernames[0:10]).Once().Return(expectedResponse, nil) | ||||
| 	suite.MojangApi.On("UsernamesToUuids", usernames[10:12]).Once().Return(expectedResponse, nil) | ||||
|  | ||||
| 	channels := make([]chan *batchUuidsProviderGetUuidResult, len(usernames)) | ||||
| 	for i, username := range usernames { | ||||
| 		channels[i] = suite.GetUuidAsync(username) | ||||
| 	} | ||||
|  | ||||
| 	suite.Iterate() | ||||
| 	suite.Iterate() | ||||
|  | ||||
| 	for _, channel := range channels { | ||||
| 		<-channel | ||||
| 	} | ||||
| 	suite.Strategy.Iterate(0, 1) // Return no jobs and indicate that there is one job in queue | ||||
| 	<-done | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) TestDoNothingWhenNoTasks() { | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Times(3) | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", []string{"username"}, 0).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", []string{"username"}, mock.Anything, nil).Once() | ||||
| 	var nilStringSlice []string | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", nilStringSlice, 0).Twice() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Times(3) | ||||
|  | ||||
| 	suite.MojangApi.On("UsernamesToUuids", []string{"username"}).Once().Return([]*mojang.ProfileInfo{}, nil) | ||||
|  | ||||
| 	// Perform first iteration and await it finishes | ||||
| 	resultChan := suite.GetUuidAsync("username") | ||||
|  | ||||
| 	suite.Iterate() | ||||
|  | ||||
| 	result := <-resultChan | ||||
| 	suite.Assert().Nil(result.Result) | ||||
| 	suite.Assert().Nil(result.Error) | ||||
|  | ||||
| 	// Let it to perform a few more iterations to ensure, that there are no calls to external APIs | ||||
| 	suite.Iterate() | ||||
| 	suite.Iterate() | ||||
| } | ||||
|  | ||||
| func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError() { | ||||
| // Test written for multiple usernames to ensure that the error | ||||
| // will be returned for each iteration group | ||||
| func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernamesWithAnError() { | ||||
| 	expectedUsernames := []string{"username1", "username2"} | ||||
| 	expectedError := &mojang.TooManyRequestsError{} | ||||
| 	var nilProfilesResponse []*mojang.ProfileInfo | ||||
|  | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, nilProfilesResponse, expectedError).Once() | ||||
| 	suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() | ||||
|  | ||||
| 	suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return(nil, expectedError) | ||||
|  | ||||
| 	resultChan1 := suite.GetUuidAsync("username1") | ||||
| 	resultChan2 := suite.GetUuidAsync("username2") | ||||
|  | ||||
| 	suite.Iterate() | ||||
| 	suite.Strategy.Iterate(2, 0) | ||||
|  | ||||
| 	result1 := <-resultChan1 | ||||
| 	suite.Assert().Nil(result1.Result) | ||||
| @@ -287,14 +228,213 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError( | ||||
| 	suite.Assert().Equal(expectedError, result2.Error) | ||||
| } | ||||
|  | ||||
| var replacer = strings.NewReplacer("-", "_", "=", "") | ||||
| func TestPeriodicStrategy(t *testing.T) { | ||||
| 	t.Run("should return first job only after duration", func(t *testing.T) { | ||||
| 		d := 20 * time.Millisecond | ||||
| 		strategy := NewPeriodicStrategy(d, 10) | ||||
| 		j := &job{} | ||||
| 		strategy.Queue(j) | ||||
|  | ||||
| // https://stackoverflow.com/a/50581165 | ||||
| func randStr(len int) string { | ||||
| 	buff := make([]byte, len) | ||||
| 	_, _ = rand.Read(buff) | ||||
| 	str := replacer.Replace(base64.URLEncoding.EncodeToString(buff)) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		startedAt := time.Now() | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
| 		iteration := <-ch | ||||
| 		durationBeforeResult := time.Now().Sub(startedAt) | ||||
| 		require.True(t, durationBeforeResult >= d) | ||||
| 		require.True(t, durationBeforeResult < d*2) | ||||
|  | ||||
| 	// Base 64 can be longer than len | ||||
| 	return str[:len] | ||||
| 		require.Equal(t, []*job{j}, iteration.Jobs) | ||||
| 		require.Equal(t, 0, iteration.Queue) | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("should return the configured batch size", func(t *testing.T) { | ||||
| 		strategy := NewPeriodicStrategy(0, 10) | ||||
| 		jobs := make([]*job, 15) | ||||
| 		for i := 0; i < 15; i++ { | ||||
| 			jobs[i] = &job{Username: strconv.Itoa(i)} | ||||
| 			strategy.Queue(jobs[i]) | ||||
| 		} | ||||
|  | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
| 		iteration := <-ch | ||||
| 		require.Len(t, iteration.Jobs, 10) | ||||
| 		require.Equal(t, jobs[0:10], iteration.Jobs) | ||||
| 		require.Equal(t, 5, iteration.Queue) | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("should not return the next iteration until the previous one is finished", func(t *testing.T) { | ||||
| 		strategy := NewPeriodicStrategy(0, 10) | ||||
| 		strategy.Queue(&job{}) | ||||
|  | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
| 		iteration := <-ch | ||||
| 		require.Len(t, iteration.Jobs, 1) | ||||
| 		require.Equal(t, 0, iteration.Queue) | ||||
|  | ||||
| 		time.Sleep(time.Millisecond) // Let strategy's internal loop to work (if the implementation is broken) | ||||
|  | ||||
| 		select { | ||||
| 		case <-ch: | ||||
| 			require.Fail(t, "the previous iteration isn't marked as done") | ||||
| 		default: | ||||
| 			// ok | ||||
| 		} | ||||
|  | ||||
| 		iteration.Done() | ||||
|  | ||||
| 		time.Sleep(time.Millisecond) // Let strategy's internal loop to work | ||||
|  | ||||
| 		select { | ||||
| 		case iteration = <-ch: | ||||
| 			// ok | ||||
| 		default: | ||||
| 			require.Fail(t, "iteration should be provided") | ||||
| 		} | ||||
|  | ||||
| 		require.Empty(t, iteration.Jobs) | ||||
| 		require.Equal(t, 0, iteration.Queue) | ||||
| 		iteration.Done() | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("each iteration should be returned only after the configured duration", func(t *testing.T) { | ||||
| 		d := 5 * time.Millisecond | ||||
| 		strategy := NewPeriodicStrategy(d, 10) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
| 		for i := 0; i < 3; i++ { | ||||
| 			startedAt := time.Now() | ||||
| 			iteration := <-ch | ||||
| 			durationBeforeResult := time.Now().Sub(startedAt) | ||||
| 			require.True(t, durationBeforeResult >= d) | ||||
| 			require.True(t, durationBeforeResult < d*2) | ||||
|  | ||||
| 			require.Empty(t, iteration.Jobs) | ||||
| 			require.Equal(t, 0, iteration.Queue) | ||||
|  | ||||
| 			// Sleep for at least doubled duration before calling Done() to check, | ||||
| 			// that this duration isn't included into the next iteration time | ||||
| 			time.Sleep(d * 2) | ||||
| 			iteration.Done() | ||||
| 		} | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func TestFullBusStrategy(t *testing.T) { | ||||
| 	t.Run("should provide iteration immediately when the batch size exceeded", func(t *testing.T) { | ||||
| 		jobs := make([]*job, 10) | ||||
| 		for i := 0; i < 10; i++ { | ||||
| 			jobs[i] = &job{} | ||||
| 		} | ||||
|  | ||||
| 		d := 20 * time.Millisecond | ||||
| 		strategy := NewFullBusStrategy(d, 10) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
|  | ||||
| 		done := make(chan struct{}) | ||||
| 		go func() { | ||||
| 			defer close(done) | ||||
| 			select { | ||||
| 			case iteration := <-ch: | ||||
| 				require.Len(t, iteration.Jobs, 10) | ||||
| 				require.Equal(t, 0, iteration.Queue) | ||||
| 			case <-time.After(d): | ||||
| 				require.Fail(t, "iteration should be provided immediately") | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		for _, j := range jobs { | ||||
| 			strategy.Queue(j) | ||||
| 		} | ||||
|  | ||||
| 		<-done | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("should provide iteration after duration if batch size isn't exceeded", func(t *testing.T) { | ||||
| 		jobs := make([]*job, 9) | ||||
| 		for i := 0; i < 9; i++ { | ||||
| 			jobs[i] = &job{} | ||||
| 		} | ||||
|  | ||||
| 		d := 20 * time.Millisecond | ||||
| 		strategy := NewFullBusStrategy(d, 10) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
|  | ||||
| 		startedAt := time.Now() | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
|  | ||||
| 		done := make(chan struct{}) | ||||
| 		go func() { | ||||
| 			defer close(done) | ||||
| 			iteration := <-ch | ||||
| 			duration := time.Now().Sub(startedAt) | ||||
| 			require.True(t, duration >= d, fmt.Sprintf("has %d, expected %d", duration, d)) | ||||
| 			require.True(t, duration < d*2) | ||||
| 			require.Equal(t, jobs, iteration.Jobs) | ||||
| 			require.Equal(t, 0, iteration.Queue) | ||||
| 		}() | ||||
|  | ||||
| 		for _, j := range jobs { | ||||
| 			strategy.Queue(j) | ||||
| 		} | ||||
|  | ||||
| 		<-done | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("should provide iteration as soon as the bus is full, without waiting for the previous iteration to finish", func(t *testing.T) { | ||||
| 		d := 20 * time.Millisecond | ||||
| 		strategy := NewFullBusStrategy(d, 10) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		ch := strategy.GetJobs(ctx) | ||||
|  | ||||
| 		done := make(chan struct{}) | ||||
| 		go func() { | ||||
| 			defer close(done) | ||||
| 			for i := 0; i < 3; i++ { | ||||
| 				time.Sleep(5 * time.Millisecond) // See comment below | ||||
| 				select { | ||||
| 				case iteration := <-ch: | ||||
| 					require.Len(t, iteration.Jobs, 10) | ||||
| 					// Don't assert iteration.Queue length since it might be unstable | ||||
| 					// Don't call iteration.Done() | ||||
| 				case <-time.After(d): | ||||
| 					t.Fatalf("iteration should be provided as soon as the bus is full") | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			// Scheduled 31 tasks. 3 iterations should be performed immediately | ||||
| 			// and should be executed only after timeout. The timeout above is used | ||||
| 			// to increase overall time to ensure, that timer resets on every iteration | ||||
|  | ||||
| 			startedAt := time.Now() | ||||
| 			iteration := <-ch | ||||
| 			duration := time.Now().Sub(startedAt) | ||||
| 			require.True(t, duration >= d) | ||||
| 			require.True(t, duration < d*2) | ||||
| 			require.Len(t, iteration.Jobs, 1) | ||||
| 			require.Equal(t, 0, iteration.Queue) | ||||
| 		}() | ||||
|  | ||||
| 		for i := 0; i < 31; i++ { | ||||
| 			strategy.Queue(&job{}) | ||||
| 		} | ||||
|  | ||||
| 		<-done | ||||
|  | ||||
| 		cancel() | ||||
| 	}) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user