Commit ef5d0ff1 authored by Matt Joiner's avatar Matt Joiner
Browse files

Use a send rate limiter implemented with STM

parent fc900234
package dht
import (
"golang.org/x/time/rate"
)
var defaultSendLimiter = rate.NewLimiter(100, 100)
var defaultSendLimiter = newRateLimiter(25, 25)
......@@ -9,6 +9,7 @@ require (
github.com/anacrolix/torrent v1.7.1
github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
github.com/lukechampine/stm v0.0.0-20191022212748-05486c32d236
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0
github.com/willf/bloom v2.0.3+incompatible
......
package dht
import (
"context"
"sync"
"time"
"github.com/lukechampine/stm"
)
type numTokens int
type rateLimiter struct {
max *stm.Var
cur *stm.Var
interval time.Duration
mu sync.Mutex
t *time.Timer
}
func newRateLimiter(rate float64, burst numTokens) *rateLimiter {
rl := &rateLimiter{
max: stm.NewVar(burst),
cur: stm.NewVar(burst),
interval: time.Duration(float64(1*time.Second) / rate),
}
rl.mu.Lock()
rl.t = time.AfterFunc(rl.interval, rl.timerCallback)
rl.mu.Unlock()
return rl
}
func (rl *rateLimiter) timerCallback() {
stm.Atomically(func(tx *stm.Tx) {
cur := tx.Get(rl.cur).(numTokens)
max := tx.Get(rl.max).(numTokens)
if cur < max {
tx.Set(rl.cur, cur+1)
}
})
rl.mu.Lock()
rl.t.Reset(rl.interval)
rl.mu.Unlock()
}
func (rl *rateLimiter) Allow() bool {
return stm.Atomically(func(tx *stm.Tx) {
tx.Return(rl.takeToken(tx))
}).(bool)
}
func (rl *rateLimiter) takeToken(tx *stm.Tx) bool {
cur := tx.Get(rl.cur).(numTokens)
if cur > 0 {
tx.Set(rl.cur, cur-1)
return true
}
return false
}
func (rl *rateLimiter) Wait(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctxDone := stm.NewVar(false)
go func() {
<-ctx.Done()
stm.AtomicSet(ctxDone, true)
}()
if err := stm.Atomically(func(tx *stm.Tx) {
if tx.Get(ctxDone).(bool) {
tx.Return(ctx.Err())
}
if rl.takeToken(tx) {
tx.Return(nil)
}
tx.Retry()
}); err != nil {
return err.(error)
}
return nil
}
......@@ -19,7 +19,6 @@ import (
"github.com/anacrolix/torrent/logonce"
"github.com/anacrolix/torrent/metainfo"
"github.com/pkg/errors"
"golang.org/x/time/rate"
"github.com/anacrolix/dht/v2/krpc"
)
......@@ -45,7 +44,10 @@ type Server struct {
tokenServer tokenServer // Manages tokens we issue to our queriers.
config ServerConfig
stats ServerStats
sendLimit *rate.Limiter
sendLimit interface {
Wait(ctx context.Context) error
Allow() bool
}
}
func (s *Server) numGoodNodes() (num int) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment