Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dmsgweb handle multiple tcp connections correctly #270

Merged
merged 24 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions cmd/dmsgweb/commands/dmsgweb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -181,20 +180,20 @@
dmsgWebLog.Info("dmsg client pk: ", pk.String())
if len(resolveDmsgAddr) > 0 {
dialPK = make([]cipher.PubKey, len(resolveDmsgAddr))
dmsgPorts = make([]uint, dmsgSessions)
dmsgPorts = make([]uint, len(resolveDmsgAddr))
for i, dmsgaddr := range resolveDmsgAddr {
dmsgWebLog.Info("dmsg address to dial: ", dmsgaddr)
dmsgAddr = strings.Split(dmsgaddr, ":")
var setpk cipher.PubKey
err := setpk.Set(dmsgAddr[0])
if err != nil {
log.Fatalf("failed to parse dmsg <address>:<port> : %v", err)
dmsgWebLog.WithError(err).Fatal("failed to parse dmsg <address>:<port>")
}
dialPK[i] = setpk
if len(dmsgAddr) > 1 {
dport, err := strconv.ParseUint(dmsgAddr[1], 10, 64)
if err != nil {
log.Fatalf("Failed to parse dmsg port: %v", err)
dmsgWebLog.WithError(err).Fatal("Failed to parse dmsg port")
}
dmsgPorts[i] = uint(dport)
} else {
Expand All @@ -207,7 +206,7 @@
// Use SOCKS5 proxy dialer if specified
dialer, err = proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
if err != nil {
log.Fatalf("Error creating SOCKS5 dialer: %v", err)
dmsgWebLog.WithError(err).Fatal("Error creating SOCKS5 dialer")
}
transport := &http.Transport{
Dial: dialer.Dial,
Expand Down Expand Up @@ -267,19 +266,19 @@

// Start the SOCKS5 server
socksAddr := fmt.Sprintf("127.0.0.1:%v", proxyPort)
log.Printf("SOCKS5 proxy server started on %s", socksAddr)
dmsgWebLog.Debug("SOCKS5 proxy server started on", socksAddr)

server, err := socks5.New(conf)
if err != nil {
log.Fatalf("Failed to create SOCKS5 server: %v", err)
dmsgWebLog.WithError(err).Fatal("Failed to create SOCKS5 server")
}

wg.Add(1)
go func() {
dmsgWebLog.Debug("Serving SOCKS5 proxy on " + socksAddr)
err := server.ListenAndServe("tcp", socksAddr)
if err != nil {
log.Fatalf("Failed to start SOCKS5 server: %v", err)
dmsgWebLog.WithError(err).Fatal("Failed to start SOCKS5 server")
}
defer server.Close()
dmsgWebLog.Debug("Stopped serving SOCKS5 proxy on " + socksAddr)
Expand All @@ -288,24 +287,28 @@

if len(resolveDmsgAddr) == 0 && len(webPort) == 1 {
if rawTCP[0] {
proxyTCPConn(-1)
dmsgWebLog.Debug("proxyTCPConn(-1)")
proxyTCPConn(-1, dmsgC)
} else {
proxyHTTPConn(-1)
dmsgWebLog.Debug("proxyHTTPConn(-1)")
proxyHTTPConn(-1, dmsgC)
}
} else {
for i := range resolveDmsgAddr {
if rawTCP[i] {
proxyTCPConn(i)
dmsgWebLog.Debug("proxyTCPConn(" + fmt.Sprintf("%v", i) + ")")
proxyTCPConn(i, dmsgC)
} else {
proxyHTTPConn(i)
dmsgWebLog.Debug("proxyHTTPConn(" + fmt.Sprintf("%v", i) + ")")
proxyHTTPConn(i, dmsgC)
}
}
}
wg.Wait()
},
}

func proxyHTTPConn(n int) {
func proxyHTTPConn(n int, dmsgC *dmsg.Client) {

Check failure on line 311 in cmd/dmsgweb/commands/dmsgweb.go

View workflow job for this annotation

GitHub Actions / linux

unused-parameter: parameter 'dmsgC' seems to be unused, consider removing or renaming it as _ (revive)
r := gin.New()

r.Use(gin.Recovery())
Expand Down Expand Up @@ -333,10 +336,11 @@
}
}

fmt.Printf("Proxying request: %s %s\n", c.Request.Method, urlStr)
dmsgWebLog.Debug("Proxying request: %s %s\n", c.Request.Method, urlStr)
req, err := http.NewRequest(c.Request.Method, urlStr, c.Request.Body)
if err != nil {
c.String(http.StatusInternalServerError, "Failed to create HTTP request")
dmsgWebLog.WithError(err).Warn("Failed to create HTTP request")
return
}

Expand All @@ -349,7 +353,7 @@
resp, err := httpC.Do(req)
if err != nil {
c.String(http.StatusInternalServerError, "Failed to connect to HTTP server")
fmt.Printf("Error: %v\n", err)
dmsgWebLog.WithError(err).Warn("Failed to connect to HTTP server")
return
}
defer resp.Body.Close() //nolint
Expand All @@ -363,7 +367,7 @@
c.Status(resp.StatusCode)
if _, err := io.Copy(c.Writer, resp.Body); err != nil {
c.String(http.StatusInternalServerError, "Failed to copy response body")
fmt.Printf("Error copying response body: %v\n", err)
dmsgWebLog.WithError(err).Warn("Failed to copy response body")
}
})
wg.Add(1)
Expand All @@ -380,7 +384,7 @@
wg.Done()
}()
}
func proxyTCPConn(n int) {
func proxyTCPConn(n int, dmsgC *dmsg.Client) {
var thiswebport uint
if n == -1 {
thiswebport = webPort[0]
Expand All @@ -389,49 +393,56 @@
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", thiswebport))
if err != nil {
dmsgWebLog.Fatalf("Failed to start TCP listener on port %v: %v", thiswebport, err)
dmsgWebLog.WithError(err).Fatal(fmt.Sprintf("Failed to start TCP listener on port: %v", thiswebport))
}
defer listener.Close() //nolint
log.Printf("Serving TCP on 127.0.0.1:%v", thiswebport)
dmsgWebLog.Debug("Serving TCP on 127.0.0.1:", thiswebport)
if dmsgC == nil {
dmsgWebLog.Fatal("dmsgC is nil")
}

for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Failed to accept connection: %v", err)
dmsgWebLog.WithError(err).Warn("Failed to accept connection")
continue
}

wg.Add(1)
go func(conn net.Conn, n int) {
go func(conn net.Conn, n int, dmsgC *dmsg.Client) {
defer wg.Done()
defer conn.Close() //nolint
dp, ok := safecast.To[uint16](dmsgPorts[n])
if !ok {
dmsgWebLog.Fatal("uint16 overflow when converting dmsg port")
}
// dialPK[n].Set("020679271a434fd4a362000ccba80ce583df58b5a16cba091004f657406443e773")
// dp = uint16(8000)
// dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: dp}) //nolint
dmsgWebLog.Debug(fmt.Sprintf("Dialing dmsg address: %v ; port: %v", dialPK[n].String(), dmsgPorts[n]))
dmsgConn, err := dmsgC.DialStream(context.Background(), dmsg.Addr{PK: dialPK[n], Port: dp}) //nolint
if err != nil {
log.Printf("Failed to dial dmsg address %v:%v %v", dialPK[n].String(), dmsgPorts[n], err)
dmsgWebLog.WithError(err).Warn(fmt.Sprintf("Failed to dial dmsg address %v port %v", dialPK[n].String(), dmsgPorts[n]))
return
}
defer dmsgConn.Close() //nolint

go func() {
_, err := io.Copy(dmsgConn, conn)
if err != nil {
log.Printf("Error copying data to dmsg server: %v", err)
dmsgWebLog.WithError(err).Warn("Error on io.Copy(dmsgConn, conn)")
}
dmsgConn.Close() //nolint
}()

go func() {
_, err := io.Copy(conn, dmsgConn)
if err != nil {
log.Printf("Error copying data from dmsg server: %v", err)
dmsgWebLog.WithError(err).Warn("Error on io.Copy(conn, dmsgConn)")
}
conn.Close() //nolint
}()
}(conn, n)
}(conn, n, dmsgC)
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/dmsgweb/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
var (
httpC http.Client
dmsgC *dmsg.Client
closeDmsg func()
dmsgDisc = dmsg.DiscAddr(false)
proxyAddr string
dmsgSessions int
Expand Down
Loading
Loading