296 lines
9.7 KiB
Go
296 lines
9.7 KiB
Go
package fa
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
// Priority orders requests competing for the shared rate-limiter token. A
|
|
// lower numeric value is a higher priority: the limiter always serves a
|
|
// waiting request of higher priority before one of lower priority.
|
|
//
|
|
// Priority only decides the order in which competing requests reserve the
|
|
// next token never how fast tokens are emitted. There is still a single
|
|
// global token bucket, because FurAffinity rate-limits per account.
|
|
//
|
|
// Priority markers take effect only on a Client built with
|
|
// [WithPrioritizedRateLimiting]. Without it every request is served in plain
|
|
// FIFO order and the marker is inert so setting a priority is always safe.
|
|
type Priority int
|
|
|
|
const (
|
|
// PriorityInteractive is the highest priority: requests the user is
|
|
// actively waiting on the page on screen, user write actions.
|
|
PriorityInteractive Priority = iota
|
|
|
|
// PriorityNormal is the default priority of any request whose context
|
|
// carries no priority marker.
|
|
PriorityNormal
|
|
|
|
// PriorityLow is below normal: best-effort work that should yield to
|
|
// user-driven traffic, such as speculative preloading of likely-next
|
|
// submissions.
|
|
PriorityLow
|
|
|
|
// PriorityBackground is the lowest priority: bulk background work such
|
|
// as inbox or watchlist crawling. It yields to every other level.
|
|
PriorityBackground
|
|
)
|
|
|
|
// numPriorities is the count of defined Priority levels.
|
|
const numPriorities = 4
|
|
|
|
// String returns the constant name of p, for log readability.
|
|
func (p Priority) String() string {
|
|
switch p {
|
|
case PriorityInteractive:
|
|
return "interactive"
|
|
case PriorityNormal:
|
|
return "normal"
|
|
case PriorityLow:
|
|
return "low"
|
|
case PriorityBackground:
|
|
return "background"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// priorityKey is the context key carrying a request's [Priority].
|
|
type priorityKey struct{}
|
|
|
|
// WithPriority marks ctx and every SDK call made with it at priority p.
|
|
// A context that carries no marker resolves to [PriorityNormal].
|
|
//
|
|
// A p outside [PriorityInteractive, PriorityBackground] is clamped into that
|
|
// range; callers using the named constants never trigger this.
|
|
//
|
|
// This marker takes effect only on a Client built with
|
|
// [WithPrioritizedRateLimiting]; otherwise it is inert. See [Priority].
|
|
func WithPriority(ctx context.Context, p Priority) context.Context {
|
|
if p < PriorityInteractive {
|
|
p = PriorityInteractive
|
|
}
|
|
if p > PriorityBackground {
|
|
p = PriorityBackground
|
|
}
|
|
return context.WithValue(ctx, priorityKey{}, p)
|
|
}
|
|
|
|
// WithBackgroundPriority marks ctx and every SDK call made with it as
|
|
// lowest-priority background work. It is exactly
|
|
// WithPriority(ctx, [PriorityBackground]).
|
|
//
|
|
// A background request yields the rate limiter to every higher-priority
|
|
// request: it will not take the next token while any higher-priority request
|
|
// is queued for one. This does not change the overall pace there is still a
|
|
// single global token bucket, because FA rate-limits per account only the
|
|
// order in which competing requests are served.
|
|
//
|
|
// Use it for best-effort work (preloading, crawling) so it never delays a
|
|
// request the user is actively waiting on. Background requests can be starved
|
|
// indefinitely by a steady stream of higher-priority traffic; that is
|
|
// intentional for best-effort work.
|
|
//
|
|
// This marker only takes effect on a Client built with
|
|
// [WithPrioritizedRateLimiting]. On a client without it the marker is inert
|
|
// and every request is served in plain FIFO order.
|
|
func WithBackgroundPriority(ctx context.Context) context.Context {
|
|
return WithPriority(ctx, PriorityBackground)
|
|
}
|
|
|
|
// priorityOf reports the [Priority] ctx was decorated with, or
|
|
// [PriorityNormal] when ctx is nil or carries no marker.
|
|
func priorityOf(ctx context.Context) Priority {
|
|
if ctx == nil {
|
|
return PriorityNormal
|
|
}
|
|
p, ok := ctx.Value(priorityKey{}).(Priority)
|
|
if !ok {
|
|
return PriorityNormal
|
|
}
|
|
return p
|
|
}
|
|
|
|
// rateLimiter paces the HTTP requests the SDK makes.
|
|
//
|
|
// With priority scheduling disabled (the default) it is a plain FIFO token
|
|
// bucket: [rate.Limiter] grants reservations in call order.
|
|
//
|
|
// With priority scheduling enabled it is instead a purpose-built
|
|
// priority-aware token bucket. Tokens accrue at the same fixed rate, but a
|
|
// waiting request consumes the next token only once no higher-priority
|
|
// request is also waiting so the highest-priority waiter is always served
|
|
// first. The emission rate is unchanged; priority only reorders who consumes
|
|
// each token. Requests at the same priority level race for each token.
|
|
type rateLimiter struct {
|
|
lim *rate.Limiter // FIFO pacing; used when priority is disabled
|
|
priority bool // honour Priority markers via the bucket below
|
|
|
|
// Priority-aware token bucket, used only when priority is enabled.
|
|
mu sync.Mutex
|
|
interval time.Duration // time to accrue one token
|
|
burst float64 // token-bucket capacity
|
|
tokens float64 // tokens currently available
|
|
last time.Time // when tokens was last accrued
|
|
waiters [numPriorities]int // requests currently waiting at each level
|
|
changed chan struct{} // closed and replaced on every waiter-set change
|
|
}
|
|
|
|
// newRateLimiter constructs a token-bucket limiter that emits one token per
|
|
// interval, with the given burst size. A burst of 1 yields strict pacing;
|
|
// burst > 1 lets short bursts through before the steady rate kicks in.
|
|
//
|
|
// priority enables priority-aware scheduling; when false the limiter is a
|
|
// plain FIFO bucket and [Priority] markers are ignored.
|
|
func newRateLimiter(interval time.Duration, burst int, priority bool) *rateLimiter {
|
|
if interval <= 0 {
|
|
interval = time.Second
|
|
}
|
|
if burst <= 0 {
|
|
burst = 1
|
|
}
|
|
return &rateLimiter{
|
|
lim: rate.NewLimiter(rate.Every(interval), burst),
|
|
priority: priority,
|
|
interval: interval,
|
|
burst: float64(burst),
|
|
tokens: float64(burst),
|
|
last: time.Now(),
|
|
changed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// wait blocks until a token is available or ctx is cancelled.
|
|
//
|
|
// With priority scheduling disabled it is a plain FIFO token reservation.
|
|
// With it enabled the request waits until no higher-priority request is
|
|
// queued and a token is available, then consumes it. Cancellation propagates
|
|
// through both the limiter and the surrounding HTTP request.
|
|
//
|
|
// Requests at the same priority level are not ordered relative to one
|
|
// another they race for each token. Lower-priority levels can be starved
|
|
// indefinitely by a steady stream of higher-priority traffic; that is
|
|
// intentional for best-effort background work.
|
|
func (r *rateLimiter) wait(ctx context.Context) error {
|
|
if r == nil || r.lim == nil {
|
|
return nil
|
|
}
|
|
// Priority scheduling is opt-in (see WithPrioritizedRateLimiting). When
|
|
// off, every request is a plain FIFO reservation and any Priority marker
|
|
// on ctx is ignored.
|
|
if !r.priority {
|
|
return r.lim.Wait(ctx)
|
|
}
|
|
return r.waitPriority(ctx, priorityOf(ctx))
|
|
}
|
|
|
|
// waitPriority serves a request at priority p from the priority-aware token
|
|
// bucket. It registers the request so lower-priority requests defer to it,
|
|
// then loops until it can consume a token: it sleeps until the bucket should
|
|
// have refilled, waking early whenever the waiter set changes (a request
|
|
// arrived or left) to re-evaluate its turn.
|
|
func (r *rateLimiter) waitPriority(ctx context.Context, p Priority) error {
|
|
r.register(p)
|
|
defer r.unregister(p)
|
|
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.mu.Lock()
|
|
r.refill()
|
|
// The eligibility check and the consume happen in one critical
|
|
// section, so a higher-priority register cannot interleave between
|
|
// them.
|
|
blocked := r.blockingAbove(p)
|
|
if !blocked && r.tokens >= 1 {
|
|
r.tokens--
|
|
r.mu.Unlock()
|
|
return nil
|
|
}
|
|
ch := r.changed
|
|
var sleep time.Duration
|
|
if !blocked {
|
|
// Highest-priority waiter, but short of a token: sleep until the
|
|
// bucket should have accrued one.
|
|
sleep = time.Duration((1 - r.tokens) * float64(r.interval))
|
|
}
|
|
r.mu.Unlock()
|
|
|
|
// sleep <= 0 means this request is blocked behind a higher-priority
|
|
// level: wait only for the waiter set to change (or for cancellation).
|
|
if sleep <= 0 {
|
|
select {
|
|
case <-ch:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
continue
|
|
}
|
|
timer := time.NewTimer(sleep)
|
|
select {
|
|
case <-timer.C:
|
|
case <-ch:
|
|
timer.Stop()
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// refill accrues tokens for the time elapsed since the last refill, capped at
|
|
// the bucket's burst capacity. Caller must hold r.mu.
|
|
func (r *rateLimiter) refill() {
|
|
now := time.Now()
|
|
r.tokens += float64(now.Sub(r.last)) / float64(r.interval)
|
|
if r.tokens > r.burst {
|
|
r.tokens = r.burst
|
|
}
|
|
r.last = now
|
|
}
|
|
|
|
// blockingAbove reports whether any request strictly higher-priority than p
|
|
// is currently waiting. Caller must hold r.mu.
|
|
func (r *rateLimiter) blockingAbove(p Priority) bool {
|
|
for q := PriorityInteractive; q < p; q++ {
|
|
if r.waiters[q] > 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// register records a request now waiting at priority p and wakes every other
|
|
// waiter so it can re-evaluate its turn.
|
|
func (r *rateLimiter) register(p Priority) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.waiters[p]++
|
|
r.broadcast()
|
|
}
|
|
|
|
// unregister records that a request waiting at priority p has finished (it
|
|
// consumed a token, or its context was cancelled) and wakes every other
|
|
// waiter so it can re-evaluate its turn.
|
|
func (r *rateLimiter) unregister(p Priority) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.waiters[p]--
|
|
r.broadcast()
|
|
}
|
|
|
|
// broadcast wakes every goroutine selecting on the current changed channel
|
|
// and installs a fresh one for the next wait. It intentionally over-wakes —
|
|
// every waiter re-evaluates its turn even if its eligibility did not change.
|
|
// Caller must hold r.mu.
|
|
func (r *rateLimiter) broadcast() {
|
|
close(r.changed)
|
|
r.changed = make(chan struct{})
|
|
}
|