Chore: use generics as possible
This commit is contained in:
@@ -5,10 +5,10 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Option = func(b *Batch)
|
||||
type Option[T any] func(b *Batch[T])
|
||||
|
||||
type Result struct {
|
||||
Value any
|
||||
type Result[T any] struct {
|
||||
Value T
|
||||
Err error
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ type Error struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func WithConcurrencyNum(n int) Option {
|
||||
return func(b *Batch) {
|
||||
func WithConcurrencyNum[T any](n int) Option[T] {
|
||||
return func(b *Batch[T]) {
|
||||
q := make(chan struct{}, n)
|
||||
for i := 0; i < n; i++ {
|
||||
q <- struct{}{}
|
||||
@@ -28,8 +28,8 @@ func WithConcurrencyNum(n int) Option {
|
||||
}
|
||||
|
||||
// Batch similar to errgroup, but can control the maximum number of concurrent
|
||||
type Batch struct {
|
||||
result map[string]Result
|
||||
type Batch[T any] struct {
|
||||
result map[string]Result[T]
|
||||
queue chan struct{}
|
||||
wg sync.WaitGroup
|
||||
mux sync.Mutex
|
||||
@@ -38,7 +38,7 @@ type Batch struct {
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (b *Batch) Go(key string, fn func() (any, error)) {
|
||||
func (b *Batch[T]) Go(key string, fn func() (T, error)) {
|
||||
b.wg.Add(1)
|
||||
go func() {
|
||||
defer b.wg.Done()
|
||||
@@ -59,14 +59,14 @@ func (b *Batch) Go(key string, fn func() (any, error)) {
|
||||
})
|
||||
}
|
||||
|
||||
ret := Result{value, err}
|
||||
ret := Result[T]{value, err}
|
||||
b.mux.Lock()
|
||||
defer b.mux.Unlock()
|
||||
b.result[key] = ret
|
||||
}()
|
||||
}
|
||||
|
||||
func (b *Batch) Wait() *Error {
|
||||
func (b *Batch[T]) Wait() *Error {
|
||||
b.wg.Wait()
|
||||
if b.cancel != nil {
|
||||
b.cancel()
|
||||
@@ -74,26 +74,26 @@ func (b *Batch) Wait() *Error {
|
||||
return b.err
|
||||
}
|
||||
|
||||
func (b *Batch) WaitAndGetResult() (map[string]Result, *Error) {
|
||||
func (b *Batch[T]) WaitAndGetResult() (map[string]Result[T], *Error) {
|
||||
err := b.Wait()
|
||||
return b.Result(), err
|
||||
}
|
||||
|
||||
func (b *Batch) Result() map[string]Result {
|
||||
func (b *Batch[T]) Result() map[string]Result[T] {
|
||||
b.mux.Lock()
|
||||
defer b.mux.Unlock()
|
||||
copy := map[string]Result{}
|
||||
copyM := map[string]Result[T]{}
|
||||
for k, v := range b.result {
|
||||
copy[k] = v
|
||||
copyM[k] = v
|
||||
}
|
||||
return copy
|
||||
return copyM
|
||||
}
|
||||
|
||||
func New(ctx context.Context, opts ...Option) (*Batch, context.Context) {
|
||||
func New[T any](ctx context.Context, opts ...Option[T]) (*Batch[T], context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
b := &Batch{
|
||||
result: map[string]Result{},
|
||||
b := &Batch[T]{
|
||||
result: map[string]Result[T]{},
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
|
||||
@@ -11,14 +11,14 @@ import (
|
||||
)
|
||||
|
||||
func TestBatch(t *testing.T) {
|
||||
b, _ := New(context.Background())
|
||||
b, _ := New[string](context.Background())
|
||||
|
||||
now := time.Now()
|
||||
b.Go("foo", func() (any, error) {
|
||||
b.Go("foo", func() (string, error) {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
return "foo", nil
|
||||
})
|
||||
b.Go("bar", func() (any, error) {
|
||||
b.Go("bar", func() (string, error) {
|
||||
time.Sleep(time.Millisecond * 150)
|
||||
return "bar", nil
|
||||
})
|
||||
@@ -32,20 +32,20 @@ func TestBatch(t *testing.T) {
|
||||
|
||||
for k, v := range result {
|
||||
assert.NoError(t, v.Err)
|
||||
assert.Equal(t, k, v.Value.(string))
|
||||
assert.Equal(t, k, v.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchWithConcurrencyNum(t *testing.T) {
|
||||
b, _ := New(
|
||||
b, _ := New[string](
|
||||
context.Background(),
|
||||
WithConcurrencyNum(3),
|
||||
WithConcurrencyNum[string](3),
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
for i := 0; i < 7; i++ {
|
||||
idx := i
|
||||
b.Go(strconv.Itoa(idx), func() (any, error) {
|
||||
b.Go(strconv.Itoa(idx), func() (string, error) {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
return strconv.Itoa(idx), nil
|
||||
})
|
||||
@@ -57,21 +57,21 @@ func TestBatchWithConcurrencyNum(t *testing.T) {
|
||||
|
||||
for k, v := range result {
|
||||
assert.NoError(t, v.Err)
|
||||
assert.Equal(t, k, v.Value.(string))
|
||||
assert.Equal(t, k, v.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchContext(t *testing.T) {
|
||||
b, ctx := New(context.Background())
|
||||
b, ctx := New[string](context.Background())
|
||||
|
||||
b.Go("error", func() (any, error) {
|
||||
b.Go("error", func() (string, error) {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
return nil, errors.New("test error")
|
||||
return "", errors.New("test error")
|
||||
})
|
||||
|
||||
b.Go("ctx", func() (any, error) {
|
||||
b.Go("ctx", func() (string, error) {
|
||||
<-ctx.Done()
|
||||
return nil, ctx.Err()
|
||||
return "", ctx.Err()
|
||||
})
|
||||
|
||||
result, err := b.WaitAndGetResult()
|
||||
|
||||
37
common/cache/lrucache.go
vendored
37
common/cache/lrucache.go
vendored
@@ -3,19 +3,20 @@ package cache
|
||||
// Modified by https://github.com/die-net/lrucache
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Dreamacro/clash/common/generics/list"
|
||||
)
|
||||
|
||||
// Option is part of Functional Options Pattern
|
||||
type Option[K comparable, V any] func(*LruCache[K, V])
|
||||
|
||||
// EvictCallback is used to get a callback when a cache entry is evicted
|
||||
type EvictCallback = func(key any, value any)
|
||||
type EvictCallback[K comparable, V any] func(key K, value V)
|
||||
|
||||
// WithEvict set the evict callback
|
||||
func WithEvict[K comparable, V any](cb EvictCallback) Option[K, V] {
|
||||
func WithEvict[K comparable, V any](cb EvictCallback[K, V]) Option[K, V] {
|
||||
return func(l *LruCache[K, V]) {
|
||||
l.onEvict = cb
|
||||
}
|
||||
@@ -57,18 +58,18 @@ type LruCache[K comparable, V any] struct {
|
||||
maxAge int64
|
||||
maxSize int
|
||||
mu sync.Mutex
|
||||
cache map[any]*list.Element
|
||||
lru *list.List // Front is least-recent
|
||||
cache map[K]*list.Element[*entry[K, V]]
|
||||
lru *list.List[*entry[K, V]] // Front is least-recent
|
||||
updateAgeOnGet bool
|
||||
staleReturn bool
|
||||
onEvict EvictCallback
|
||||
onEvict EvictCallback[K, V]
|
||||
}
|
||||
|
||||
// NewLRUCache creates an LruCache
|
||||
func NewLRUCache[K comparable, V any](options ...Option[K, V]) *LruCache[K, V] {
|
||||
lc := &LruCache[K, V]{
|
||||
lru: list.New(),
|
||||
cache: make(map[any]*list.Element),
|
||||
lru: list.New[*entry[K, V]](),
|
||||
cache: make(map[K]*list.Element[*entry[K, V]]),
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
@@ -129,7 +130,7 @@ func (c *LruCache[K, V]) SetWithExpire(key K, value V, expires time.Time) {
|
||||
|
||||
if le, ok := c.cache[key]; ok {
|
||||
c.lru.MoveToBack(le)
|
||||
e := le.Value.(*entry[K, V])
|
||||
e := le.Value
|
||||
e.value = value
|
||||
e.expires = expires.Unix()
|
||||
} else {
|
||||
@@ -154,11 +155,11 @@ func (c *LruCache[K, V]) CloneTo(n *LruCache[K, V]) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
n.lru = list.New()
|
||||
n.cache = make(map[any]*list.Element)
|
||||
n.lru = list.New[*entry[K, V]]()
|
||||
n.cache = make(map[K]*list.Element[*entry[K, V]])
|
||||
|
||||
for e := c.lru.Front(); e != nil; e = e.Next() {
|
||||
elm := e.Value.(*entry[K, V])
|
||||
elm := e.Value
|
||||
n.cache[elm.key] = n.lru.PushBack(elm)
|
||||
}
|
||||
}
|
||||
@@ -172,7 +173,7 @@ func (c *LruCache[K, V]) get(key K) *entry[K, V] {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.staleReturn && c.maxAge > 0 && le.Value.(*entry[K, V]).expires <= time.Now().Unix() {
|
||||
if !c.staleReturn && c.maxAge > 0 && le.Value.expires <= time.Now().Unix() {
|
||||
c.deleteElement(le)
|
||||
c.maybeDeleteOldest()
|
||||
|
||||
@@ -180,7 +181,7 @@ func (c *LruCache[K, V]) get(key K) *entry[K, V] {
|
||||
}
|
||||
|
||||
c.lru.MoveToBack(le)
|
||||
el := le.Value.(*entry[K, V])
|
||||
el := le.Value
|
||||
if c.maxAge > 0 && c.updateAgeOnGet {
|
||||
el.expires = time.Now().Unix() + c.maxAge
|
||||
}
|
||||
@@ -201,15 +202,15 @@ func (c *LruCache[K, V]) Delete(key K) {
|
||||
func (c *LruCache[K, V]) maybeDeleteOldest() {
|
||||
if !c.staleReturn && c.maxAge > 0 {
|
||||
now := time.Now().Unix()
|
||||
for le := c.lru.Front(); le != nil && le.Value.(*entry[K, V]).expires <= now; le = c.lru.Front() {
|
||||
for le := c.lru.Front(); le != nil && le.Value.expires <= now; le = c.lru.Front() {
|
||||
c.deleteElement(le)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LruCache[K, V]) deleteElement(le *list.Element) {
|
||||
func (c *LruCache[K, V]) deleteElement(le *list.Element[*entry[K, V]]) {
|
||||
c.lru.Remove(le)
|
||||
e := le.Value.(*entry[K, V])
|
||||
e := le.Value
|
||||
delete(c.cache, e.key)
|
||||
if c.onEvict != nil {
|
||||
c.onEvict(e.key, e.value)
|
||||
@@ -219,7 +220,7 @@ func (c *LruCache[K, V]) deleteElement(le *list.Element) {
|
||||
func (c *LruCache[K, V]) Clear() error {
|
||||
c.mu.Lock()
|
||||
|
||||
c.cache = make(map[any]*list.Element)
|
||||
c.cache = make(map[K]*list.Element[*entry[K, V]])
|
||||
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
|
||||
18
common/cache/lrucache_test.go
vendored
18
common/cache/lrucache_test.go
vendored
@@ -52,18 +52,18 @@ func TestLRUMaxAge(t *testing.T) {
|
||||
|
||||
// Add one expired entry
|
||||
c.Set("foo", "bar")
|
||||
c.lru.Back().Value.(*entry[string, string]).expires = now
|
||||
c.lru.Back().Value.expires = now
|
||||
|
||||
// Reset
|
||||
c.Set("foo", "bar")
|
||||
e := c.lru.Back().Value.(*entry[string, string])
|
||||
e := c.lru.Back().Value
|
||||
assert.True(t, e.expires >= now)
|
||||
c.lru.Back().Value.(*entry[string, string]).expires = now
|
||||
c.lru.Back().Value.expires = now
|
||||
|
||||
// Set a few and verify expiration times
|
||||
for _, s := range entries {
|
||||
c.Set(s.key, s.value)
|
||||
e := c.lru.Back().Value.(*entry[string, string])
|
||||
e := c.lru.Back().Value
|
||||
assert.True(t, e.expires >= expected && e.expires <= expected+10)
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func TestLRUMaxAge(t *testing.T) {
|
||||
for _, s := range entries {
|
||||
le, ok := c.cache[s.key]
|
||||
if assert.True(t, ok) {
|
||||
le.Value.(*entry[string, string]).expires = now
|
||||
le.Value.expires = now
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,11 +95,11 @@ func TestLRUpdateOnGet(t *testing.T) {
|
||||
|
||||
// Add one expired entry
|
||||
c.Set("foo", "bar")
|
||||
c.lru.Back().Value.(*entry[string, string]).expires = expires
|
||||
c.lru.Back().Value.expires = expires
|
||||
|
||||
_, ok := c.Get("foo")
|
||||
assert.True(t, ok)
|
||||
assert.True(t, c.lru.Back().Value.(*entry[string, string]).expires > expires)
|
||||
assert.True(t, c.lru.Back().Value.expires > expires)
|
||||
}
|
||||
|
||||
func TestMaxSize(t *testing.T) {
|
||||
@@ -126,8 +126,8 @@ func TestExist(t *testing.T) {
|
||||
|
||||
func TestEvict(t *testing.T) {
|
||||
temp := 0
|
||||
evict := func(key any, value any) {
|
||||
temp = key.(int) + value.(int)
|
||||
evict := func(key int, value int) {
|
||||
temp = key + value
|
||||
}
|
||||
|
||||
c := NewLRUCache[int, int](WithEvict[int, int](evict), WithSize[int, int](1))
|
||||
|
||||
235
common/generics/list/list.go
Normal file
235
common/generics/list/list.go
Normal file
@@ -0,0 +1,235 @@
|
||||
// Copyright 2009 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package list implements a doubly linked list.
|
||||
//
|
||||
// To iterate over a list (where l is a *List):
|
||||
// for e := l.Front(); e != nil; e = e.Next() {
|
||||
// // do something with e.Value
|
||||
// }
|
||||
//
|
||||
package list
|
||||
|
||||
// Element is an element of a linked list.
|
||||
type Element[T any] struct {
|
||||
// Next and previous pointers in the doubly-linked list of elements.
|
||||
// To simplify the implementation, internally a list l is implemented
|
||||
// as a ring, such that &l.root is both the next element of the last
|
||||
// list element (l.Back()) and the previous element of the first list
|
||||
// element (l.Front()).
|
||||
next, prev *Element[T]
|
||||
|
||||
// The list to which this element belongs.
|
||||
list *List[T]
|
||||
|
||||
// The value stored with this element.
|
||||
Value T
|
||||
}
|
||||
|
||||
// Next returns the next list element or nil.
|
||||
func (e *Element[T]) Next() *Element[T] {
|
||||
if p := e.next; e.list != nil && p != &e.list.root {
|
||||
return p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prev returns the previous list element or nil.
|
||||
func (e *Element[T]) Prev() *Element[T] {
|
||||
if p := e.prev; e.list != nil && p != &e.list.root {
|
||||
return p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List represents a doubly linked list.
|
||||
// The zero value for List is an empty list ready to use.
|
||||
type List[T any] struct {
|
||||
root Element[T] // sentinel list element, only &root, root.prev, and root.next are used
|
||||
len int // current list length excluding (this) sentinel element
|
||||
}
|
||||
|
||||
// Init initializes or clears list l.
|
||||
func (l *List[T]) Init() *List[T] {
|
||||
l.root.next = &l.root
|
||||
l.root.prev = &l.root
|
||||
l.len = 0
|
||||
return l
|
||||
}
|
||||
|
||||
// New returns an initialized list.
|
||||
func New[T any]() *List[T] { return new(List[T]).Init() }
|
||||
|
||||
// Len returns the number of elements of list l.
|
||||
// The complexity is O(1).
|
||||
func (l *List[T]) Len() int { return l.len }
|
||||
|
||||
// Front returns the first element of list l or nil if the list is empty.
|
||||
func (l *List[T]) Front() *Element[T] {
|
||||
if l.len == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.root.next
|
||||
}
|
||||
|
||||
// Back returns the last element of list l or nil if the list is empty.
|
||||
func (l *List[T]) Back() *Element[T] {
|
||||
if l.len == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.root.prev
|
||||
}
|
||||
|
||||
// lazyInit lazily initializes a zero List value.
|
||||
func (l *List[T]) lazyInit() {
|
||||
if l.root.next == nil {
|
||||
l.Init()
|
||||
}
|
||||
}
|
||||
|
||||
// insert inserts e after at, increments l.len, and returns e.
|
||||
func (l *List[T]) insert(e, at *Element[T]) *Element[T] {
|
||||
e.prev = at
|
||||
e.next = at.next
|
||||
e.prev.next = e
|
||||
e.next.prev = e
|
||||
e.list = l
|
||||
l.len++
|
||||
return e
|
||||
}
|
||||
|
||||
// insertValue is a convenience wrapper for insert(&Element{Value: v}, at).
|
||||
func (l *List[T]) insertValue(v T, at *Element[T]) *Element[T] {
|
||||
return l.insert(&Element[T]{Value: v}, at)
|
||||
}
|
||||
|
||||
// remove removes e from its list, decrements l.len
|
||||
func (l *List[T]) remove(e *Element[T]) {
|
||||
e.prev.next = e.next
|
||||
e.next.prev = e.prev
|
||||
e.next = nil // avoid memory leaks
|
||||
e.prev = nil // avoid memory leaks
|
||||
e.list = nil
|
||||
l.len--
|
||||
}
|
||||
|
||||
// move moves e to next to at.
|
||||
func (l *List[T]) move(e, at *Element[T]) {
|
||||
if e == at {
|
||||
return
|
||||
}
|
||||
e.prev.next = e.next
|
||||
e.next.prev = e.prev
|
||||
|
||||
e.prev = at
|
||||
e.next = at.next
|
||||
e.prev.next = e
|
||||
e.next.prev = e
|
||||
}
|
||||
|
||||
// Remove removes e from l if e is an element of list l.
|
||||
// It returns the element value e.Value.
|
||||
// The element must not be nil.
|
||||
func (l *List[T]) Remove(e *Element[T]) T {
|
||||
if e.list == l {
|
||||
// if e.list == l, l must have been initialized when e was inserted
|
||||
// in l or l == nil (e is a zero Element) and l.remove will crash
|
||||
l.remove(e)
|
||||
}
|
||||
return e.Value
|
||||
}
|
||||
|
||||
// PushFront inserts a new element e with value v at the front of list l and returns e.
|
||||
func (l *List[T]) PushFront(v T) *Element[T] {
|
||||
l.lazyInit()
|
||||
return l.insertValue(v, &l.root)
|
||||
}
|
||||
|
||||
// PushBack inserts a new element e with value v at the back of list l and returns e.
|
||||
func (l *List[T]) PushBack(v T) *Element[T] {
|
||||
l.lazyInit()
|
||||
return l.insertValue(v, l.root.prev)
|
||||
}
|
||||
|
||||
// InsertBefore inserts a new element e with value v immediately before mark and returns e.
|
||||
// If mark is not an element of l, the list is not modified.
|
||||
// The mark must not be nil.
|
||||
func (l *List[T]) InsertBefore(v T, mark *Element[T]) *Element[T] {
|
||||
if mark.list != l {
|
||||
return nil
|
||||
}
|
||||
// see comment in List.Remove about initialization of l
|
||||
return l.insertValue(v, mark.prev)
|
||||
}
|
||||
|
||||
// InsertAfter inserts a new element e with value v immediately after mark and returns e.
|
||||
// If mark is not an element of l, the list is not modified.
|
||||
// The mark must not be nil.
|
||||
func (l *List[T]) InsertAfter(v T, mark *Element[T]) *Element[T] {
|
||||
if mark.list != l {
|
||||
return nil
|
||||
}
|
||||
// see comment in List.Remove about initialization of l
|
||||
return l.insertValue(v, mark)
|
||||
}
|
||||
|
||||
// MoveToFront moves element e to the front of list l.
|
||||
// If e is not an element of l, the list is not modified.
|
||||
// The element must not be nil.
|
||||
func (l *List[T]) MoveToFront(e *Element[T]) {
|
||||
if e.list != l || l.root.next == e {
|
||||
return
|
||||
}
|
||||
// see comment in List.Remove about initialization of l
|
||||
l.move(e, &l.root)
|
||||
}
|
||||
|
||||
// MoveToBack moves element e to the back of list l.
|
||||
// If e is not an element of l, the list is not modified.
|
||||
// The element must not be nil.
|
||||
func (l *List[T]) MoveToBack(e *Element[T]) {
|
||||
if e.list != l || l.root.prev == e {
|
||||
return
|
||||
}
|
||||
// see comment in List.Remove about initialization of l
|
||||
l.move(e, l.root.prev)
|
||||
}
|
||||
|
||||
// MoveBefore moves element e to its new position before mark.
|
||||
// If e or mark is not an element of l, or e == mark, the list is not modified.
|
||||
// The element and mark must not be nil.
|
||||
func (l *List[T]) MoveBefore(e, mark *Element[T]) {
|
||||
if e.list != l || e == mark || mark.list != l {
|
||||
return
|
||||
}
|
||||
l.move(e, mark.prev)
|
||||
}
|
||||
|
||||
// MoveAfter moves element e to its new position after mark.
|
||||
// If e or mark is not an element of l, or e == mark, the list is not modified.
|
||||
// The element and mark must not be nil.
|
||||
func (l *List[T]) MoveAfter(e, mark *Element[T]) {
|
||||
if e.list != l || e == mark || mark.list != l {
|
||||
return
|
||||
}
|
||||
l.move(e, mark)
|
||||
}
|
||||
|
||||
// PushBackList inserts a copy of another list at the back of list l.
|
||||
// The lists l and other may be the same. They must not be nil.
|
||||
func (l *List[T]) PushBackList(other *List[T]) {
|
||||
l.lazyInit()
|
||||
for i, e := other.Len(), other.Front(); i > 0; i, e = i-1, e.Next() {
|
||||
l.insertValue(e.Value, l.root.prev)
|
||||
}
|
||||
}
|
||||
|
||||
// PushFrontList inserts a copy of another list at the front of list l.
|
||||
// The lists l and other may be the same. They must not be nil.
|
||||
func (l *List[T]) PushFrontList(other *List[T]) {
|
||||
l.lazyInit()
|
||||
for i, e := other.Len(), other.Back(); i > 0; i, e = i-1, e.Prev() {
|
||||
l.insertValue(e.Value, &l.root)
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,3 @@
|
||||
package observable
|
||||
|
||||
type Iterable <-chan any
|
||||
type Iterable[T any] <-chan T
|
||||
|
||||
@@ -5,14 +5,14 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Observable struct {
|
||||
iterable Iterable
|
||||
listener map[Subscription]*Subscriber
|
||||
type Observable[T any] struct {
|
||||
iterable Iterable[T]
|
||||
listener map[Subscription[T]]*Subscriber[T]
|
||||
mux sync.Mutex
|
||||
done bool
|
||||
}
|
||||
|
||||
func (o *Observable) process() {
|
||||
func (o *Observable[T]) process() {
|
||||
for item := range o.iterable {
|
||||
o.mux.Lock()
|
||||
for _, sub := range o.listener {
|
||||
@@ -23,7 +23,7 @@ func (o *Observable) process() {
|
||||
o.close()
|
||||
}
|
||||
|
||||
func (o *Observable) close() {
|
||||
func (o *Observable[T]) close() {
|
||||
o.mux.Lock()
|
||||
defer o.mux.Unlock()
|
||||
|
||||
@@ -33,18 +33,18 @@ func (o *Observable) close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Observable) Subscribe() (Subscription, error) {
|
||||
func (o *Observable[T]) Subscribe() (Subscription[T], error) {
|
||||
o.mux.Lock()
|
||||
defer o.mux.Unlock()
|
||||
if o.done {
|
||||
return nil, errors.New("Observable is closed")
|
||||
return nil, errors.New("observable is closed")
|
||||
}
|
||||
subscriber := newSubscriber()
|
||||
subscriber := newSubscriber[T]()
|
||||
o.listener[subscriber.Out()] = subscriber
|
||||
return subscriber.Out(), nil
|
||||
}
|
||||
|
||||
func (o *Observable) UnSubscribe(sub Subscription) {
|
||||
func (o *Observable[T]) UnSubscribe(sub Subscription[T]) {
|
||||
o.mux.Lock()
|
||||
defer o.mux.Unlock()
|
||||
subscriber, exist := o.listener[sub]
|
||||
@@ -55,10 +55,10 @@ func (o *Observable) UnSubscribe(sub Subscription) {
|
||||
subscriber.Close()
|
||||
}
|
||||
|
||||
func NewObservable(any Iterable) *Observable {
|
||||
observable := &Observable{
|
||||
iterable: any,
|
||||
listener: map[Subscription]*Subscriber{},
|
||||
func NewObservable[T any](iter Iterable[T]) *Observable[T] {
|
||||
observable := &Observable[T]{
|
||||
iterable: iter,
|
||||
listener: map[Subscription[T]]*Subscriber[T]{},
|
||||
}
|
||||
go observable.process()
|
||||
return observable
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
func iterator(item []any) chan any {
|
||||
ch := make(chan any)
|
||||
func iterator[T any](item []T) chan T {
|
||||
ch := make(chan T)
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
for _, elm := range item {
|
||||
@@ -22,8 +22,8 @@ func iterator(item []any) chan any {
|
||||
}
|
||||
|
||||
func TestObservable(t *testing.T) {
|
||||
iter := iterator([]any{1, 2, 3, 4, 5})
|
||||
src := NewObservable(iter)
|
||||
iter := iterator[int]([]int{1, 2, 3, 4, 5})
|
||||
src := NewObservable[int](iter)
|
||||
data, err := src.Subscribe()
|
||||
assert.Nil(t, err)
|
||||
count := 0
|
||||
@@ -34,15 +34,15 @@ func TestObservable(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestObservable_MultiSubscribe(t *testing.T) {
|
||||
iter := iterator([]any{1, 2, 3, 4, 5})
|
||||
src := NewObservable(iter)
|
||||
iter := iterator[int]([]int{1, 2, 3, 4, 5})
|
||||
src := NewObservable[int](iter)
|
||||
ch1, _ := src.Subscribe()
|
||||
ch2, _ := src.Subscribe()
|
||||
count := atomic.NewInt32(0)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
waitCh := func(ch <-chan any) {
|
||||
waitCh := func(ch <-chan int) {
|
||||
for range ch {
|
||||
count.Inc()
|
||||
}
|
||||
@@ -55,8 +55,8 @@ func TestObservable_MultiSubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestObservable_UnSubscribe(t *testing.T) {
|
||||
iter := iterator([]any{1, 2, 3, 4, 5})
|
||||
src := NewObservable(iter)
|
||||
iter := iterator[int]([]int{1, 2, 3, 4, 5})
|
||||
src := NewObservable[int](iter)
|
||||
data, err := src.Subscribe()
|
||||
assert.Nil(t, err)
|
||||
src.UnSubscribe(data)
|
||||
@@ -65,8 +65,8 @@ func TestObservable_UnSubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestObservable_SubscribeClosedSource(t *testing.T) {
|
||||
iter := iterator([]any{1})
|
||||
src := NewObservable(iter)
|
||||
iter := iterator[int]([]int{1})
|
||||
src := NewObservable[int](iter)
|
||||
data, _ := src.Subscribe()
|
||||
<-data
|
||||
|
||||
@@ -75,18 +75,18 @@ func TestObservable_SubscribeClosedSource(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) {
|
||||
sub := Subscription(make(chan any))
|
||||
iter := iterator([]any{1})
|
||||
src := NewObservable(iter)
|
||||
sub := Subscription[int](make(chan int))
|
||||
iter := iterator[int]([]int{1})
|
||||
src := NewObservable[int](iter)
|
||||
src.UnSubscribe(sub)
|
||||
}
|
||||
|
||||
func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
|
||||
iter := iterator([]any{1, 2, 3, 4, 5})
|
||||
src := NewObservable(iter)
|
||||
iter := iterator[int]([]int{1, 2, 3, 4, 5})
|
||||
src := NewObservable[int](iter)
|
||||
max := 100
|
||||
|
||||
var list []Subscription
|
||||
var list []Subscription[int]
|
||||
for i := 0; i < max; i++ {
|
||||
ch, _ := src.Subscribe()
|
||||
list = append(list, ch)
|
||||
@@ -94,7 +94,7 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(max)
|
||||
waitCh := func(ch <-chan any) {
|
||||
waitCh := func(ch <-chan int) {
|
||||
for range ch {
|
||||
}
|
||||
wg.Done()
|
||||
@@ -115,11 +115,11 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
|
||||
}
|
||||
|
||||
func Benchmark_Observable_1000(b *testing.B) {
|
||||
ch := make(chan any)
|
||||
o := NewObservable(ch)
|
||||
ch := make(chan int)
|
||||
o := NewObservable[int](ch)
|
||||
num := 1000
|
||||
|
||||
subs := []Subscription{}
|
||||
subs := []Subscription[int]{}
|
||||
for i := 0; i < num; i++ {
|
||||
sub, _ := o.Subscribe()
|
||||
subs = append(subs, sub)
|
||||
@@ -130,7 +130,7 @@ func Benchmark_Observable_1000(b *testing.B) {
|
||||
|
||||
b.ResetTimer()
|
||||
for _, sub := range subs {
|
||||
go func(s Subscription) {
|
||||
go func(s Subscription[int]) {
|
||||
for range s {
|
||||
}
|
||||
wg.Done()
|
||||
|
||||
@@ -4,30 +4,30 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Subscription <-chan any
|
||||
type Subscription[T any] <-chan T
|
||||
|
||||
type Subscriber struct {
|
||||
buffer chan any
|
||||
type Subscriber[T any] struct {
|
||||
buffer chan T
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (s *Subscriber) Emit(item any) {
|
||||
func (s *Subscriber[T]) Emit(item T) {
|
||||
s.buffer <- item
|
||||
}
|
||||
|
||||
func (s *Subscriber) Out() Subscription {
|
||||
func (s *Subscriber[T]) Out() Subscription[T] {
|
||||
return s.buffer
|
||||
}
|
||||
|
||||
func (s *Subscriber) Close() {
|
||||
func (s *Subscriber[T]) Close() {
|
||||
s.once.Do(func() {
|
||||
close(s.buffer)
|
||||
})
|
||||
}
|
||||
|
||||
func newSubscriber() *Subscriber {
|
||||
sub := &Subscriber{
|
||||
buffer: make(chan any, 200),
|
||||
func newSubscriber[T any]() *Subscriber[T] {
|
||||
sub := &Subscriber[T]{
|
||||
buffer: make(chan T, 200),
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
// Picker provides synchronization, and Context cancelation
|
||||
// for groups of goroutines working on subtasks of a common task.
|
||||
// Inspired by errGroup
|
||||
type Picker struct {
|
||||
type Picker[T any] struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
@@ -17,12 +17,12 @@ type Picker struct {
|
||||
|
||||
once sync.Once
|
||||
errOnce sync.Once
|
||||
result any
|
||||
result T
|
||||
err error
|
||||
}
|
||||
|
||||
func newPicker(ctx context.Context, cancel func()) *Picker {
|
||||
return &Picker{
|
||||
func newPicker[T any](ctx context.Context, cancel func()) *Picker[T] {
|
||||
return &Picker[T]{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
@@ -30,20 +30,20 @@ func newPicker(ctx context.Context, cancel func()) *Picker {
|
||||
|
||||
// WithContext returns a new Picker and an associated Context derived from ctx.
|
||||
// and cancel when first element return.
|
||||
func WithContext(ctx context.Context) (*Picker, context.Context) {
|
||||
func WithContext[T any](ctx context.Context) (*Picker[T], context.Context) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return newPicker(ctx, cancel), ctx
|
||||
return newPicker[T](ctx, cancel), ctx
|
||||
}
|
||||
|
||||
// WithTimeout returns a new Picker and an associated Context derived from ctx with timeout.
|
||||
func WithTimeout(ctx context.Context, timeout time.Duration) (*Picker, context.Context) {
|
||||
func WithTimeout[T any](ctx context.Context, timeout time.Duration) (*Picker[T], context.Context) {
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
return newPicker(ctx, cancel), ctx
|
||||
return newPicker[T](ctx, cancel), ctx
|
||||
}
|
||||
|
||||
// Wait blocks until all function calls from the Go method have returned,
|
||||
// then returns the first nil error result (if any) from them.
|
||||
func (p *Picker) Wait() any {
|
||||
func (p *Picker[T]) Wait() T {
|
||||
p.wg.Wait()
|
||||
if p.cancel != nil {
|
||||
p.cancel()
|
||||
@@ -52,13 +52,13 @@ func (p *Picker) Wait() any {
|
||||
}
|
||||
|
||||
// Error return the first error (if all success return nil)
|
||||
func (p *Picker) Error() error {
|
||||
func (p *Picker[T]) Error() error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
// Go calls the given function in a new goroutine.
|
||||
// The first call to return a nil error cancels the group; its result will be returned by Wait.
|
||||
func (p *Picker) Go(f func() (any, error)) {
|
||||
func (p *Picker[T]) Go(f func() (T, error)) {
|
||||
p.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
|
||||
@@ -8,33 +8,38 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func sleepAndSend(ctx context.Context, delay int, input any) func() (any, error) {
|
||||
return func() (any, error) {
|
||||
func sleepAndSend[T any](ctx context.Context, delay int, input T) func() (T, error) {
|
||||
return func() (T, error) {
|
||||
timer := time.NewTimer(time.Millisecond * time.Duration(delay))
|
||||
select {
|
||||
case <-timer.C:
|
||||
return input, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
return getZero[T](), ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPicker_Basic(t *testing.T) {
|
||||
picker, ctx := WithContext(context.Background())
|
||||
picker, ctx := WithContext[int](context.Background())
|
||||
picker.Go(sleepAndSend(ctx, 30, 2))
|
||||
picker.Go(sleepAndSend(ctx, 20, 1))
|
||||
|
||||
number := picker.Wait()
|
||||
assert.NotNil(t, number)
|
||||
assert.Equal(t, number.(int), 1)
|
||||
assert.Equal(t, number, 1)
|
||||
}
|
||||
|
||||
func TestPicker_Timeout(t *testing.T) {
|
||||
picker, ctx := WithTimeout(context.Background(), time.Millisecond*5)
|
||||
picker, ctx := WithTimeout[int](context.Background(), time.Millisecond*5)
|
||||
picker.Go(sleepAndSend(ctx, 20, 1))
|
||||
|
||||
number := picker.Wait()
|
||||
assert.Nil(t, number)
|
||||
assert.Equal(t, number, getZero[int]())
|
||||
assert.NotNil(t, picker.Error())
|
||||
}
|
||||
|
||||
func getZero[T any]() T {
|
||||
var result T
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -5,28 +5,28 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type call struct {
|
||||
type call[T any] struct {
|
||||
wg sync.WaitGroup
|
||||
val any
|
||||
val T
|
||||
err error
|
||||
}
|
||||
|
||||
type Single struct {
|
||||
type Single[T any] struct {
|
||||
mux sync.Mutex
|
||||
last time.Time
|
||||
wait time.Duration
|
||||
call *call
|
||||
result *Result
|
||||
call *call[T]
|
||||
result *Result[T]
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
Val any
|
||||
type Result[T any] struct {
|
||||
Val T
|
||||
Err error
|
||||
}
|
||||
|
||||
// Do single.Do likes sync.singleFlight
|
||||
//lint:ignore ST1008 it likes sync.singleFlight
|
||||
func (s *Single) Do(fn func() (any, error)) (v any, err error, shared bool) {
|
||||
func (s *Single[T]) Do(fn func() (T, error)) (v T, err error, shared bool) {
|
||||
s.mux.Lock()
|
||||
now := time.Now()
|
||||
if now.Before(s.last.Add(s.wait)) {
|
||||
@@ -34,31 +34,31 @@ func (s *Single) Do(fn func() (any, error)) (v any, err error, shared bool) {
|
||||
return s.result.Val, s.result.Err, true
|
||||
}
|
||||
|
||||
if call := s.call; call != nil {
|
||||
if callM := s.call; callM != nil {
|
||||
s.mux.Unlock()
|
||||
call.wg.Wait()
|
||||
return call.val, call.err, true
|
||||
callM.wg.Wait()
|
||||
return callM.val, callM.err, true
|
||||
}
|
||||
|
||||
call := &call{}
|
||||
call.wg.Add(1)
|
||||
s.call = call
|
||||
callM := &call[T]{}
|
||||
callM.wg.Add(1)
|
||||
s.call = callM
|
||||
s.mux.Unlock()
|
||||
call.val, call.err = fn()
|
||||
call.wg.Done()
|
||||
callM.val, callM.err = fn()
|
||||
callM.wg.Done()
|
||||
|
||||
s.mux.Lock()
|
||||
s.call = nil
|
||||
s.result = &Result{call.val, call.err}
|
||||
s.result = &Result[T]{callM.val, callM.err}
|
||||
s.last = now
|
||||
s.mux.Unlock()
|
||||
return call.val, call.err, false
|
||||
return callM.val, callM.err, false
|
||||
}
|
||||
|
||||
func (s *Single) Reset() {
|
||||
func (s *Single[T]) Reset() {
|
||||
s.last = time.Time{}
|
||||
}
|
||||
|
||||
func NewSingle(wait time.Duration) *Single {
|
||||
return &Single{wait: wait}
|
||||
func NewSingle[T any](wait time.Duration) *Single[T] {
|
||||
return &Single[T]{wait: wait}
|
||||
}
|
||||
|
||||
@@ -10,13 +10,13 @@ import (
|
||||
)
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
single := NewSingle(time.Millisecond * 30)
|
||||
single := NewSingle[int](time.Millisecond * 30)
|
||||
foo := 0
|
||||
shardCount := atomic.NewInt32(0)
|
||||
call := func() (any, error) {
|
||||
call := func() (int, error) {
|
||||
foo++
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
return nil, nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
@@ -38,32 +38,32 @@ func TestBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTimer(t *testing.T) {
|
||||
single := NewSingle(time.Millisecond * 30)
|
||||
single := NewSingle[int](time.Millisecond * 30)
|
||||
foo := 0
|
||||
call := func() (any, error) {
|
||||
callM := func() (int, error) {
|
||||
foo++
|
||||
return nil, nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
single.Do(call)
|
||||
_, _, _ = single.Do(callM)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
_, _, shard := single.Do(call)
|
||||
_, _, shard := single.Do(callM)
|
||||
|
||||
assert.Equal(t, 1, foo)
|
||||
assert.True(t, shard)
|
||||
}
|
||||
|
||||
func TestReset(t *testing.T) {
|
||||
single := NewSingle(time.Millisecond * 30)
|
||||
single := NewSingle[int](time.Millisecond * 30)
|
||||
foo := 0
|
||||
call := func() (any, error) {
|
||||
callM := func() (int, error) {
|
||||
foo++
|
||||
return nil, nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
single.Do(call)
|
||||
_, _, _ = single.Do(callM)
|
||||
single.Reset()
|
||||
single.Do(call)
|
||||
_, _, _ = single.Do(callM)
|
||||
|
||||
assert.Equal(t, 2, foo)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user