inital commit
This commit is contained in:
295
ratelimit.go
Normal file
295
ratelimit.go
Normal file
@@ -0,0 +1,295 @@
|
||||
package fa
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Priority orders requests competing for the shared rate-limiter token. A
|
||||
// lower numeric value is a higher priority: the limiter always serves a
|
||||
// waiting request of higher priority before one of lower priority.
|
||||
//
|
||||
// Priority only decides the order in which competing requests reserve the
|
||||
// next token never how fast tokens are emitted. There is still a single
|
||||
// global token bucket, because FurAffinity rate-limits per account.
|
||||
//
|
||||
// Priority markers take effect only on a Client built with
|
||||
// [WithPrioritizedRateLimiting]. Without it every request is served in plain
|
||||
// FIFO order and the marker is inert so setting a priority is always safe.
|
||||
type Priority int
|
||||
|
||||
const (
|
||||
// PriorityInteractive is the highest priority: requests the user is
|
||||
// actively waiting on the page on screen, user write actions.
|
||||
PriorityInteractive Priority = iota
|
||||
|
||||
// PriorityNormal is the default priority of any request whose context
|
||||
// carries no priority marker.
|
||||
PriorityNormal
|
||||
|
||||
// PriorityLow is below normal: best-effort work that should yield to
|
||||
// user-driven traffic, such as speculative preloading of likely-next
|
||||
// submissions.
|
||||
PriorityLow
|
||||
|
||||
// PriorityBackground is the lowest priority: bulk background work such
|
||||
// as inbox or watchlist crawling. It yields to every other level.
|
||||
PriorityBackground
|
||||
)
|
||||
|
||||
// numPriorities is the count of defined Priority levels.
|
||||
const numPriorities = 4
|
||||
|
||||
// String returns the constant name of p, for log readability.
|
||||
func (p Priority) String() string {
|
||||
switch p {
|
||||
case PriorityInteractive:
|
||||
return "interactive"
|
||||
case PriorityNormal:
|
||||
return "normal"
|
||||
case PriorityLow:
|
||||
return "low"
|
||||
case PriorityBackground:
|
||||
return "background"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// priorityKey is the context key carrying a request's [Priority].
|
||||
type priorityKey struct{}
|
||||
|
||||
// WithPriority marks ctx and every SDK call made with it at priority p.
|
||||
// A context that carries no marker resolves to [PriorityNormal].
|
||||
//
|
||||
// A p outside [PriorityInteractive, PriorityBackground] is clamped into that
|
||||
// range; callers using the named constants never trigger this.
|
||||
//
|
||||
// This marker takes effect only on a Client built with
|
||||
// [WithPrioritizedRateLimiting]; otherwise it is inert. See [Priority].
|
||||
func WithPriority(ctx context.Context, p Priority) context.Context {
|
||||
if p < PriorityInteractive {
|
||||
p = PriorityInteractive
|
||||
}
|
||||
if p > PriorityBackground {
|
||||
p = PriorityBackground
|
||||
}
|
||||
return context.WithValue(ctx, priorityKey{}, p)
|
||||
}
|
||||
|
||||
// WithBackgroundPriority marks ctx and every SDK call made with it as
|
||||
// lowest-priority background work. It is exactly
|
||||
// WithPriority(ctx, [PriorityBackground]).
|
||||
//
|
||||
// A background request yields the rate limiter to every higher-priority
|
||||
// request: it will not take the next token while any higher-priority request
|
||||
// is queued for one. This does not change the overall pace there is still a
|
||||
// single global token bucket, because FA rate-limits per account only the
|
||||
// order in which competing requests are served.
|
||||
//
|
||||
// Use it for best-effort work (preloading, crawling) so it never delays a
|
||||
// request the user is actively waiting on. Background requests can be starved
|
||||
// indefinitely by a steady stream of higher-priority traffic; that is
|
||||
// intentional for best-effort work.
|
||||
//
|
||||
// This marker only takes effect on a Client built with
|
||||
// [WithPrioritizedRateLimiting]. On a client without it the marker is inert
|
||||
// and every request is served in plain FIFO order.
|
||||
func WithBackgroundPriority(ctx context.Context) context.Context {
|
||||
return WithPriority(ctx, PriorityBackground)
|
||||
}
|
||||
|
||||
// priorityOf reports the [Priority] ctx was decorated with, or
|
||||
// [PriorityNormal] when ctx is nil or carries no marker.
|
||||
func priorityOf(ctx context.Context) Priority {
|
||||
if ctx == nil {
|
||||
return PriorityNormal
|
||||
}
|
||||
p, ok := ctx.Value(priorityKey{}).(Priority)
|
||||
if !ok {
|
||||
return PriorityNormal
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// rateLimiter paces the HTTP requests the SDK makes.
|
||||
//
|
||||
// With priority scheduling disabled (the default) it is a plain FIFO token
|
||||
// bucket: [rate.Limiter] grants reservations in call order.
|
||||
//
|
||||
// With priority scheduling enabled it is instead a purpose-built
|
||||
// priority-aware token bucket. Tokens accrue at the same fixed rate, but a
|
||||
// waiting request consumes the next token only once no higher-priority
|
||||
// request is also waiting so the highest-priority waiter is always served
|
||||
// first. The emission rate is unchanged; priority only reorders who consumes
|
||||
// each token. Requests at the same priority level race for each token.
|
||||
type rateLimiter struct {
|
||||
lim *rate.Limiter // FIFO pacing; used when priority is disabled
|
||||
priority bool // honour Priority markers via the bucket below
|
||||
|
||||
// Priority-aware token bucket, used only when priority is enabled.
|
||||
mu sync.Mutex
|
||||
interval time.Duration // time to accrue one token
|
||||
burst float64 // token-bucket capacity
|
||||
tokens float64 // tokens currently available
|
||||
last time.Time // when tokens was last accrued
|
||||
waiters [numPriorities]int // requests currently waiting at each level
|
||||
changed chan struct{} // closed and replaced on every waiter-set change
|
||||
}
|
||||
|
||||
// newRateLimiter constructs a token-bucket limiter that emits one token per
|
||||
// interval, with the given burst size. A burst of 1 yields strict pacing;
|
||||
// burst > 1 lets short bursts through before the steady rate kicks in.
|
||||
//
|
||||
// priority enables priority-aware scheduling; when false the limiter is a
|
||||
// plain FIFO bucket and [Priority] markers are ignored.
|
||||
func newRateLimiter(interval time.Duration, burst int, priority bool) *rateLimiter {
|
||||
if interval <= 0 {
|
||||
interval = time.Second
|
||||
}
|
||||
if burst <= 0 {
|
||||
burst = 1
|
||||
}
|
||||
return &rateLimiter{
|
||||
lim: rate.NewLimiter(rate.Every(interval), burst),
|
||||
priority: priority,
|
||||
interval: interval,
|
||||
burst: float64(burst),
|
||||
tokens: float64(burst),
|
||||
last: time.Now(),
|
||||
changed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// wait blocks until a token is available or ctx is cancelled.
|
||||
//
|
||||
// With priority scheduling disabled it is a plain FIFO token reservation.
|
||||
// With it enabled the request waits until no higher-priority request is
|
||||
// queued and a token is available, then consumes it. Cancellation propagates
|
||||
// through both the limiter and the surrounding HTTP request.
|
||||
//
|
||||
// Requests at the same priority level are not ordered relative to one
|
||||
// another they race for each token. Lower-priority levels can be starved
|
||||
// indefinitely by a steady stream of higher-priority traffic; that is
|
||||
// intentional for best-effort background work.
|
||||
func (r *rateLimiter) wait(ctx context.Context) error {
|
||||
if r == nil || r.lim == nil {
|
||||
return nil
|
||||
}
|
||||
// Priority scheduling is opt-in (see WithPrioritizedRateLimiting). When
|
||||
// off, every request is a plain FIFO reservation and any Priority marker
|
||||
// on ctx is ignored.
|
||||
if !r.priority {
|
||||
return r.lim.Wait(ctx)
|
||||
}
|
||||
return r.waitPriority(ctx, priorityOf(ctx))
|
||||
}
|
||||
|
||||
// waitPriority serves a request at priority p from the priority-aware token
|
||||
// bucket. It registers the request so lower-priority requests defer to it,
|
||||
// then loops until it can consume a token: it sleeps until the bucket should
|
||||
// have refilled, waking early whenever the waiter set changes (a request
|
||||
// arrived or left) to re-evaluate its turn.
|
||||
func (r *rateLimiter) waitPriority(ctx context.Context, p Priority) error {
|
||||
r.register(p)
|
||||
defer r.unregister(p)
|
||||
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.refill()
|
||||
// The eligibility check and the consume happen in one critical
|
||||
// section, so a higher-priority register cannot interleave between
|
||||
// them.
|
||||
blocked := r.blockingAbove(p)
|
||||
if !blocked && r.tokens >= 1 {
|
||||
r.tokens--
|
||||
r.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
ch := r.changed
|
||||
var sleep time.Duration
|
||||
if !blocked {
|
||||
// Highest-priority waiter, but short of a token: sleep until the
|
||||
// bucket should have accrued one.
|
||||
sleep = time.Duration((1 - r.tokens) * float64(r.interval))
|
||||
}
|
||||
r.mu.Unlock()
|
||||
|
||||
// sleep <= 0 means this request is blocked behind a higher-priority
|
||||
// level: wait only for the waiter set to change (or for cancellation).
|
||||
if sleep <= 0 {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
timer := time.NewTimer(sleep)
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-ch:
|
||||
timer.Stop()
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// refill accrues tokens for the time elapsed since the last refill, capped at
|
||||
// the bucket's burst capacity. Caller must hold r.mu.
|
||||
func (r *rateLimiter) refill() {
|
||||
now := time.Now()
|
||||
r.tokens += float64(now.Sub(r.last)) / float64(r.interval)
|
||||
if r.tokens > r.burst {
|
||||
r.tokens = r.burst
|
||||
}
|
||||
r.last = now
|
||||
}
|
||||
|
||||
// blockingAbove reports whether any request strictly higher-priority than p
|
||||
// is currently waiting. Caller must hold r.mu.
|
||||
func (r *rateLimiter) blockingAbove(p Priority) bool {
|
||||
for q := PriorityInteractive; q < p; q++ {
|
||||
if r.waiters[q] > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// register records a request now waiting at priority p and wakes every other
|
||||
// waiter so it can re-evaluate its turn.
|
||||
func (r *rateLimiter) register(p Priority) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.waiters[p]++
|
||||
r.broadcast()
|
||||
}
|
||||
|
||||
// unregister records that a request waiting at priority p has finished (it
|
||||
// consumed a token, or its context was cancelled) and wakes every other
|
||||
// waiter so it can re-evaluate its turn.
|
||||
func (r *rateLimiter) unregister(p Priority) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.waiters[p]--
|
||||
r.broadcast()
|
||||
}
|
||||
|
||||
// broadcast wakes every goroutine selecting on the current changed channel
|
||||
// and installs a fresh one for the next wait. It intentionally over-wakes —
|
||||
// every waiter re-evaluates its turn even if its eligibility did not change.
|
||||
// Caller must hold r.mu.
|
||||
func (r *rateLimiter) broadcast() {
|
||||
close(r.changed)
|
||||
r.changed = make(chan struct{})
|
||||
}
|
||||
Reference in New Issue
Block a user