package fa import ( "context" "sync" "time" "golang.org/x/time/rate" ) // Priority orders requests competing for the shared rate-limiter token. A // lower numeric value is a higher priority: the limiter always serves a // waiting request of higher priority before one of lower priority. // // Priority only decides the order in which competing requests reserve the // next token never how fast tokens are emitted. There is still a single // global token bucket, because FurAffinity rate-limits per account. // // Priority markers take effect only on a Client built with // [WithPrioritizedRateLimiting]. Without it every request is served in plain // FIFO order and the marker is inert so setting a priority is always safe. type Priority int const ( // PriorityInteractive is the highest priority: requests the user is // actively waiting on the page on screen, user write actions. PriorityInteractive Priority = iota // PriorityNormal is the default priority of any request whose context // carries no priority marker. PriorityNormal // PriorityLow is below normal: best-effort work that should yield to // user-driven traffic, such as speculative preloading of likely-next // submissions. PriorityLow // PriorityBackground is the lowest priority: bulk background work such // as inbox or watchlist crawling. It yields to every other level. PriorityBackground ) // numPriorities is the count of defined Priority levels. const numPriorities = 4 // String returns the constant name of p, for log readability. func (p Priority) String() string { switch p { case PriorityInteractive: return "interactive" case PriorityNormal: return "normal" case PriorityLow: return "low" case PriorityBackground: return "background" default: return "unknown" } } // priorityKey is the context key carrying a request's [Priority]. type priorityKey struct{} // WithPriority marks ctx and every SDK call made with it at priority p. // A context that carries no marker resolves to [PriorityNormal]. // // A p outside [PriorityInteractive, PriorityBackground] is clamped into that // range; callers using the named constants never trigger this. // // This marker takes effect only on a Client built with // [WithPrioritizedRateLimiting]; otherwise it is inert. See [Priority]. func WithPriority(ctx context.Context, p Priority) context.Context { if p < PriorityInteractive { p = PriorityInteractive } if p > PriorityBackground { p = PriorityBackground } return context.WithValue(ctx, priorityKey{}, p) } // WithBackgroundPriority marks ctx and every SDK call made with it as // lowest-priority background work. It is exactly // WithPriority(ctx, [PriorityBackground]). // // A background request yields the rate limiter to every higher-priority // request: it will not take the next token while any higher-priority request // is queued for one. This does not change the overall pace there is still a // single global token bucket, because FA rate-limits per account only the // order in which competing requests are served. // // Use it for best-effort work (preloading, crawling) so it never delays a // request the user is actively waiting on. Background requests can be starved // indefinitely by a steady stream of higher-priority traffic; that is // intentional for best-effort work. // // This marker only takes effect on a Client built with // [WithPrioritizedRateLimiting]. On a client without it the marker is inert // and every request is served in plain FIFO order. func WithBackgroundPriority(ctx context.Context) context.Context { return WithPriority(ctx, PriorityBackground) } // priorityOf reports the [Priority] ctx was decorated with, or // [PriorityNormal] when ctx is nil or carries no marker. func priorityOf(ctx context.Context) Priority { if ctx == nil { return PriorityNormal } p, ok := ctx.Value(priorityKey{}).(Priority) if !ok { return PriorityNormal } return p } // rateLimiter paces the HTTP requests the SDK makes. // // With priority scheduling disabled (the default) it is a plain FIFO token // bucket: [rate.Limiter] grants reservations in call order. // // With priority scheduling enabled it is instead a purpose-built // priority-aware token bucket. Tokens accrue at the same fixed rate, but a // waiting request consumes the next token only once no higher-priority // request is also waiting so the highest-priority waiter is always served // first. The emission rate is unchanged; priority only reorders who consumes // each token. Requests at the same priority level race for each token. type rateLimiter struct { lim *rate.Limiter // FIFO pacing; used when priority is disabled priority bool // honour Priority markers via the bucket below // Priority-aware token bucket, used only when priority is enabled. mu sync.Mutex interval time.Duration // time to accrue one token burst float64 // token-bucket capacity tokens float64 // tokens currently available last time.Time // when tokens was last accrued waiters [numPriorities]int // requests currently waiting at each level changed chan struct{} // closed and replaced on every waiter-set change } // newRateLimiter constructs a token-bucket limiter that emits one token per // interval, with the given burst size. A burst of 1 yields strict pacing; // burst > 1 lets short bursts through before the steady rate kicks in. // // priority enables priority-aware scheduling; when false the limiter is a // plain FIFO bucket and [Priority] markers are ignored. func newRateLimiter(interval time.Duration, burst int, priority bool) *rateLimiter { if interval <= 0 { interval = time.Second } if burst <= 0 { burst = 1 } return &rateLimiter{ lim: rate.NewLimiter(rate.Every(interval), burst), priority: priority, interval: interval, burst: float64(burst), tokens: float64(burst), last: time.Now(), changed: make(chan struct{}), } } // wait blocks until a token is available or ctx is cancelled. // // With priority scheduling disabled it is a plain FIFO token reservation. // With it enabled the request waits until no higher-priority request is // queued and a token is available, then consumes it. Cancellation propagates // through both the limiter and the surrounding HTTP request. // // Requests at the same priority level are not ordered relative to one // another they race for each token. Lower-priority levels can be starved // indefinitely by a steady stream of higher-priority traffic; that is // intentional for best-effort background work. func (r *rateLimiter) wait(ctx context.Context) error { if r == nil || r.lim == nil { return nil } // Priority scheduling is opt-in (see WithPrioritizedRateLimiting). When // off, every request is a plain FIFO reservation and any Priority marker // on ctx is ignored. if !r.priority { return r.lim.Wait(ctx) } return r.waitPriority(ctx, priorityOf(ctx)) } // waitPriority serves a request at priority p from the priority-aware token // bucket. It registers the request so lower-priority requests defer to it, // then loops until it can consume a token: it sleeps until the bucket should // have refilled, waking early whenever the waiter set changes (a request // arrived or left) to re-evaluate its turn. func (r *rateLimiter) waitPriority(ctx context.Context, p Priority) error { r.register(p) defer r.unregister(p) for { if err := ctx.Err(); err != nil { return err } r.mu.Lock() r.refill() // The eligibility check and the consume happen in one critical // section, so a higher-priority register cannot interleave between // them. blocked := r.blockingAbove(p) if !blocked && r.tokens >= 1 { r.tokens-- r.mu.Unlock() return nil } ch := r.changed var sleep time.Duration if !blocked { // Highest-priority waiter, but short of a token: sleep until the // bucket should have accrued one. sleep = time.Duration((1 - r.tokens) * float64(r.interval)) } r.mu.Unlock() // sleep <= 0 means this request is blocked behind a higher-priority // level: wait only for the waiter set to change (or for cancellation). if sleep <= 0 { select { case <-ch: case <-ctx.Done(): return ctx.Err() } continue } timer := time.NewTimer(sleep) select { case <-timer.C: case <-ch: timer.Stop() case <-ctx.Done(): timer.Stop() return ctx.Err() } } } // refill accrues tokens for the time elapsed since the last refill, capped at // the bucket's burst capacity. Caller must hold r.mu. func (r *rateLimiter) refill() { now := time.Now() r.tokens += float64(now.Sub(r.last)) / float64(r.interval) if r.tokens > r.burst { r.tokens = r.burst } r.last = now } // blockingAbove reports whether any request strictly higher-priority than p // is currently waiting. Caller must hold r.mu. func (r *rateLimiter) blockingAbove(p Priority) bool { for q := PriorityInteractive; q < p; q++ { if r.waiters[q] > 0 { return true } } return false } // register records a request now waiting at priority p and wakes every other // waiter so it can re-evaluate its turn. func (r *rateLimiter) register(p Priority) { r.mu.Lock() defer r.mu.Unlock() r.waiters[p]++ r.broadcast() } // unregister records that a request waiting at priority p has finished (it // consumed a token, or its context was cancelled) and wakes every other // waiter so it can re-evaluate its turn. func (r *rateLimiter) unregister(p Priority) { r.mu.Lock() defer r.mu.Unlock() r.waiters[p]-- r.broadcast() } // broadcast wakes every goroutine selecting on the current changed channel // and installs a fresh one for the next wait. It intentionally over-wakes — // every waiter re-evaluates its turn even if its eligibility did not change. // Caller must hold r.mu. func (r *rateLimiter) broadcast() { close(r.changed) r.changed = make(chan struct{}) }