Files
go-fa-api/ratelimit.go
2026-05-25 22:27:18 +02:00

296 lines
9.7 KiB
Go

package fa
import (
"context"
"sync"
"time"
"golang.org/x/time/rate"
)
// Priority orders requests competing for the shared rate-limiter token. A
// lower numeric value is a higher priority: the limiter always serves a
// waiting request of higher priority before one of lower priority.
//
// Priority only decides the order in which competing requests reserve the
// next token never how fast tokens are emitted. There is still a single
// global token bucket, because FurAffinity rate-limits per account.
//
// Priority markers take effect only on a Client built with
// [WithPrioritizedRateLimiting]. Without it every request is served in plain
// FIFO order and the marker is inert so setting a priority is always safe.
type Priority int
const (
// PriorityInteractive is the highest priority: requests the user is
// actively waiting on the page on screen, user write actions.
PriorityInteractive Priority = iota
// PriorityNormal is the default priority of any request whose context
// carries no priority marker.
PriorityNormal
// PriorityLow is below normal: best-effort work that should yield to
// user-driven traffic, such as speculative preloading of likely-next
// submissions.
PriorityLow
// PriorityBackground is the lowest priority: bulk background work such
// as inbox or watchlist crawling. It yields to every other level.
PriorityBackground
)
// numPriorities is the count of defined Priority levels.
const numPriorities = 4
// String returns the constant name of p, for log readability.
func (p Priority) String() string {
switch p {
case PriorityInteractive:
return "interactive"
case PriorityNormal:
return "normal"
case PriorityLow:
return "low"
case PriorityBackground:
return "background"
default:
return "unknown"
}
}
// priorityKey is the context key carrying a request's [Priority].
type priorityKey struct{}
// WithPriority marks ctx and every SDK call made with it at priority p.
// A context that carries no marker resolves to [PriorityNormal].
//
// A p outside [PriorityInteractive, PriorityBackground] is clamped into that
// range; callers using the named constants never trigger this.
//
// This marker takes effect only on a Client built with
// [WithPrioritizedRateLimiting]; otherwise it is inert. See [Priority].
func WithPriority(ctx context.Context, p Priority) context.Context {
if p < PriorityInteractive {
p = PriorityInteractive
}
if p > PriorityBackground {
p = PriorityBackground
}
return context.WithValue(ctx, priorityKey{}, p)
}
// WithBackgroundPriority marks ctx and every SDK call made with it as
// lowest-priority background work. It is exactly
// WithPriority(ctx, [PriorityBackground]).
//
// A background request yields the rate limiter to every higher-priority
// request: it will not take the next token while any higher-priority request
// is queued for one. This does not change the overall pace there is still a
// single global token bucket, because FA rate-limits per account only the
// order in which competing requests are served.
//
// Use it for best-effort work (preloading, crawling) so it never delays a
// request the user is actively waiting on. Background requests can be starved
// indefinitely by a steady stream of higher-priority traffic; that is
// intentional for best-effort work.
//
// This marker only takes effect on a Client built with
// [WithPrioritizedRateLimiting]. On a client without it the marker is inert
// and every request is served in plain FIFO order.
func WithBackgroundPriority(ctx context.Context) context.Context {
return WithPriority(ctx, PriorityBackground)
}
// priorityOf reports the [Priority] ctx was decorated with, or
// [PriorityNormal] when ctx is nil or carries no marker.
func priorityOf(ctx context.Context) Priority {
if ctx == nil {
return PriorityNormal
}
p, ok := ctx.Value(priorityKey{}).(Priority)
if !ok {
return PriorityNormal
}
return p
}
// rateLimiter paces the HTTP requests the SDK makes.
//
// With priority scheduling disabled (the default) it is a plain FIFO token
// bucket: [rate.Limiter] grants reservations in call order.
//
// With priority scheduling enabled it is instead a purpose-built
// priority-aware token bucket. Tokens accrue at the same fixed rate, but a
// waiting request consumes the next token only once no higher-priority
// request is also waiting so the highest-priority waiter is always served
// first. The emission rate is unchanged; priority only reorders who consumes
// each token. Requests at the same priority level race for each token.
type rateLimiter struct {
lim *rate.Limiter // FIFO pacing; used when priority is disabled
priority bool // honour Priority markers via the bucket below
// Priority-aware token bucket, used only when priority is enabled.
mu sync.Mutex
interval time.Duration // time to accrue one token
burst float64 // token-bucket capacity
tokens float64 // tokens currently available
last time.Time // when tokens was last accrued
waiters [numPriorities]int // requests currently waiting at each level
changed chan struct{} // closed and replaced on every waiter-set change
}
// newRateLimiter constructs a token-bucket limiter that emits one token per
// interval, with the given burst size. A burst of 1 yields strict pacing;
// burst > 1 lets short bursts through before the steady rate kicks in.
//
// priority enables priority-aware scheduling; when false the limiter is a
// plain FIFO bucket and [Priority] markers are ignored.
func newRateLimiter(interval time.Duration, burst int, priority bool) *rateLimiter {
if interval <= 0 {
interval = time.Second
}
if burst <= 0 {
burst = 1
}
return &rateLimiter{
lim: rate.NewLimiter(rate.Every(interval), burst),
priority: priority,
interval: interval,
burst: float64(burst),
tokens: float64(burst),
last: time.Now(),
changed: make(chan struct{}),
}
}
// wait blocks until a token is available or ctx is cancelled.
//
// With priority scheduling disabled it is a plain FIFO token reservation.
// With it enabled the request waits until no higher-priority request is
// queued and a token is available, then consumes it. Cancellation propagates
// through both the limiter and the surrounding HTTP request.
//
// Requests at the same priority level are not ordered relative to one
// another they race for each token. Lower-priority levels can be starved
// indefinitely by a steady stream of higher-priority traffic; that is
// intentional for best-effort background work.
func (r *rateLimiter) wait(ctx context.Context) error {
if r == nil || r.lim == nil {
return nil
}
// Priority scheduling is opt-in (see WithPrioritizedRateLimiting). When
// off, every request is a plain FIFO reservation and any Priority marker
// on ctx is ignored.
if !r.priority {
return r.lim.Wait(ctx)
}
return r.waitPriority(ctx, priorityOf(ctx))
}
// waitPriority serves a request at priority p from the priority-aware token
// bucket. It registers the request so lower-priority requests defer to it,
// then loops until it can consume a token: it sleeps until the bucket should
// have refilled, waking early whenever the waiter set changes (a request
// arrived or left) to re-evaluate its turn.
func (r *rateLimiter) waitPriority(ctx context.Context, p Priority) error {
r.register(p)
defer r.unregister(p)
for {
if err := ctx.Err(); err != nil {
return err
}
r.mu.Lock()
r.refill()
// The eligibility check and the consume happen in one critical
// section, so a higher-priority register cannot interleave between
// them.
blocked := r.blockingAbove(p)
if !blocked && r.tokens >= 1 {
r.tokens--
r.mu.Unlock()
return nil
}
ch := r.changed
var sleep time.Duration
if !blocked {
// Highest-priority waiter, but short of a token: sleep until the
// bucket should have accrued one.
sleep = time.Duration((1 - r.tokens) * float64(r.interval))
}
r.mu.Unlock()
// sleep <= 0 means this request is blocked behind a higher-priority
// level: wait only for the waiter set to change (or for cancellation).
if sleep <= 0 {
select {
case <-ch:
case <-ctx.Done():
return ctx.Err()
}
continue
}
timer := time.NewTimer(sleep)
select {
case <-timer.C:
case <-ch:
timer.Stop()
case <-ctx.Done():
timer.Stop()
return ctx.Err()
}
}
}
// refill accrues tokens for the time elapsed since the last refill, capped at
// the bucket's burst capacity. Caller must hold r.mu.
func (r *rateLimiter) refill() {
now := time.Now()
r.tokens += float64(now.Sub(r.last)) / float64(r.interval)
if r.tokens > r.burst {
r.tokens = r.burst
}
r.last = now
}
// blockingAbove reports whether any request strictly higher-priority than p
// is currently waiting. Caller must hold r.mu.
func (r *rateLimiter) blockingAbove(p Priority) bool {
for q := PriorityInteractive; q < p; q++ {
if r.waiters[q] > 0 {
return true
}
}
return false
}
// register records a request now waiting at priority p and wakes every other
// waiter so it can re-evaluate its turn.
func (r *rateLimiter) register(p Priority) {
r.mu.Lock()
defer r.mu.Unlock()
r.waiters[p]++
r.broadcast()
}
// unregister records that a request waiting at priority p has finished (it
// consumed a token, or its context was cancelled) and wakes every other
// waiter so it can re-evaluate its turn.
func (r *rateLimiter) unregister(p Priority) {
r.mu.Lock()
defer r.mu.Unlock()
r.waiters[p]--
r.broadcast()
}
// broadcast wakes every goroutine selecting on the current changed channel
// and installs a fresh one for the next wait. It intentionally over-wakes —
// every waiter re-evaluates its turn even if its eligibility did not change.
// Caller must hold r.mu.
func (r *rateLimiter) broadcast() {
close(r.changed)
r.changed = make(chan struct{})
}