chore: better packet deadline
This commit is contained in:
@@ -11,12 +11,13 @@ import (
|
||||
|
||||
type readResult struct {
|
||||
data []byte
|
||||
put func()
|
||||
addr net.Addr
|
||||
err error
|
||||
enhanceReadResult
|
||||
singReadResult
|
||||
}
|
||||
|
||||
type PacketConn struct {
|
||||
type NetPacketConn struct {
|
||||
net.PacketConn
|
||||
deadline atomic.TypedValue[time.Time]
|
||||
pipeDeadline pipeDeadline
|
||||
@@ -25,23 +26,45 @@ type PacketConn struct {
|
||||
resultCh chan *readResult
|
||||
}
|
||||
|
||||
func NewPacketConn(pc net.PacketConn) net.PacketConn {
|
||||
c := &PacketConn{
|
||||
func NewNetPacketConn(pc net.PacketConn) net.PacketConn {
|
||||
npc := &NetPacketConn{
|
||||
PacketConn: pc,
|
||||
pipeDeadline: makePipeDeadline(),
|
||||
resultCh: make(chan *readResult, 1),
|
||||
}
|
||||
c.resultCh <- nil
|
||||
if enhancePacketConn, isEnhance := pc.(packet.EnhancePacketConn); isEnhance {
|
||||
return &EnhancePacketConn{
|
||||
PacketConn: c,
|
||||
enhancePacketConn: enhancePacketConn,
|
||||
npc.resultCh <- nil
|
||||
if enhancePC, isEnhance := pc.(packet.EnhancePacketConn); isEnhance {
|
||||
epc := &EnhancePacketConn{
|
||||
NetPacketConn: npc,
|
||||
enhancePacketConn: enhancePacketConn{
|
||||
netPacketConn: npc,
|
||||
enhancePacketConn: enhancePC,
|
||||
},
|
||||
}
|
||||
if singPC, isSingPC := pc.(packet.SingPacketConn); isSingPC {
|
||||
return &EnhanceSingPacketConn{
|
||||
EnhancePacketConn: epc,
|
||||
singPacketConn: singPacketConn{
|
||||
netPacketConn: npc,
|
||||
singPacketConn: singPC,
|
||||
},
|
||||
}
|
||||
}
|
||||
return epc
|
||||
}
|
||||
if singPC, isSingPC := pc.(packet.SingPacketConn); isSingPC {
|
||||
return &SingPacketConn{
|
||||
NetPacketConn: npc,
|
||||
singPacketConn: singPacketConn{
|
||||
netPacketConn: npc,
|
||||
singPacketConn: singPC,
|
||||
},
|
||||
}
|
||||
}
|
||||
return c
|
||||
return npc
|
||||
}
|
||||
|
||||
func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
func (c *NetPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
select {
|
||||
case result := <-c.resultCh:
|
||||
if result != nil {
|
||||
@@ -73,7 +96,7 @@ func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
return c.ReadFrom(p)
|
||||
}
|
||||
|
||||
func (c *PacketConn) pipeReadFrom(size int) {
|
||||
func (c *NetPacketConn) pipeReadFrom(size int) {
|
||||
buffer := make([]byte, size)
|
||||
n, addr, err := c.PacketConn.ReadFrom(buffer)
|
||||
buffer = buffer[:n]
|
||||
@@ -84,7 +107,7 @@ func (c *PacketConn) pipeReadFrom(size int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *PacketConn) SetReadDeadline(t time.Time) error {
|
||||
func (c *NetPacketConn) SetReadDeadline(t time.Time) error {
|
||||
if c.disablePipe.Load() {
|
||||
return c.PacketConn.SetReadDeadline(t)
|
||||
} else if c.inRead.Load() {
|
||||
@@ -96,7 +119,7 @@ func (c *PacketConn) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PacketConn) ReaderReplaceable() bool {
|
||||
func (c *NetPacketConn) ReaderReplaceable() bool {
|
||||
select {
|
||||
case result := <-c.resultCh:
|
||||
c.resultCh <- result
|
||||
@@ -111,66 +134,14 @@ func (c *PacketConn) ReaderReplaceable() bool {
|
||||
return c.disablePipe.Load() || c.deadline.Load().IsZero()
|
||||
}
|
||||
|
||||
func (c *PacketConn) WriterReplaceable() bool {
|
||||
func (c *NetPacketConn) WriterReplaceable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *PacketConn) Upstream() any {
|
||||
func (c *NetPacketConn) Upstream() any {
|
||||
return c.PacketConn
|
||||
}
|
||||
|
||||
func (c *PacketConn) NeedAdditionalReadDeadline() bool {
|
||||
func (c *NetPacketConn) NeedAdditionalReadDeadline() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type EnhancePacketConn struct {
|
||||
*PacketConn
|
||||
enhancePacketConn packet.EnhancePacketConn
|
||||
}
|
||||
|
||||
func NewEnhancePacketConn(pc packet.EnhancePacketConn) packet.EnhancePacketConn {
|
||||
return NewPacketConn(pc).(packet.EnhancePacketConn)
|
||||
}
|
||||
|
||||
func (c *EnhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) {
|
||||
select {
|
||||
case result := <-c.resultCh:
|
||||
if result != nil {
|
||||
data = result.data
|
||||
put = result.put
|
||||
addr = result.addr
|
||||
err = result.err
|
||||
c.resultCh <- nil // finish cache read
|
||||
return
|
||||
} else {
|
||||
c.resultCh <- nil
|
||||
break
|
||||
}
|
||||
case <-c.pipeDeadline.wait():
|
||||
return nil, nil, nil, os.ErrDeadlineExceeded
|
||||
}
|
||||
|
||||
if c.disablePipe.Load() {
|
||||
return c.enhancePacketConn.WaitReadFrom()
|
||||
} else if c.deadline.Load().IsZero() {
|
||||
c.inRead.Store(true)
|
||||
defer c.inRead.Store(false)
|
||||
data, put, addr, err = c.enhancePacketConn.WaitReadFrom()
|
||||
return
|
||||
}
|
||||
|
||||
<-c.resultCh
|
||||
go c.pipeWaitReadFrom()
|
||||
|
||||
return c.WaitReadFrom()
|
||||
}
|
||||
|
||||
func (c *EnhancePacketConn) pipeWaitReadFrom() {
|
||||
data, put, addr, err := c.enhancePacketConn.WaitReadFrom()
|
||||
c.resultCh <- &readResult{
|
||||
data: data,
|
||||
put: put,
|
||||
addr: addr,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user