Files
kettuRay/core/core_manager.go
2026-03-31 14:40:03 +03:00

432 lines
11 KiB
Go

package core
import (
"context"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
// StateChangedHandler is called when VPN connection state changes.
type StateChangedHandler func(state ConnectionState)
// LogHandler is called when a log message is produced. (source, message)
type LogHandler func(source, message string)
// CoreManager manages the lifecycle of xray (proxy backend) and sing-box (TUN frontend).
//
// Connection flow:
// 1. Parse VPN link
// 2. Generate configs for xray and sing-box
// 3. Start xray (SOCKS5 inbound -> proxy outbound)
// 4. Wait for SOCKS5 port readiness
// 5. Start sing-box (TUN -> SOCKS5)
type CoreManager struct {
mu sync.Mutex
xrayProcess *exec.Cmd
singBoxProcess *exec.Cmd
xrayConfigPath string
singBoxConfigPath string
cancel context.CancelFunc
coresPath string
configsPath string
State ConnectionState
CurrentServer *ProxyLink
OnStateChanged StateChangedHandler
OnLog LogHandler
}
// NewCoreManager creates a new CoreManager with paths under %APPDATA%/kettuRay.
func NewCoreManager() (*CoreManager, error) {
appData, err := os.UserConfigDir()
if err != nil {
return nil, fmt.Errorf("failed to get app data dir: %w", err)
}
appDir := filepath.Join(appData, "kettuRay")
coresPath := filepath.Join(appDir, "cores")
configsPath := filepath.Join(appDir, "configs")
if err := os.MkdirAll(coresPath, 0o755); err != nil {
return nil, fmt.Errorf("failed to create cores dir: %w", err)
}
if err := os.MkdirAll(configsPath, 0o755); err != nil {
return nil, fmt.Errorf("failed to create configs dir: %w", err)
}
cm := &CoreManager{
coresPath: coresPath,
configsPath: configsPath,
State: Disconnected,
}
// Kill any leftover xray/sing-box processes from a previous crash
cm.killStaleProcesses()
return cm, nil
}
// Connect starts the VPN connection using the provided link.
func (cm *CoreManager) Connect(link string) error {
cm.mu.Lock()
if cm.State == Connected || cm.State == Connecting {
cm.mu.Unlock()
cm.log("Core", "Already connected or connecting. Disconnect first.")
return nil
}
cm.mu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
cm.mu.Lock()
cm.cancel = cancel
cm.mu.Unlock()
defer func() {
if cm.State != Connected {
cancel()
}
}()
cm.setState(Connecting)
// 0. Kill stale processes
cm.killStaleProcesses()
if err := sleepCtx(ctx, 1*time.Second); err != nil {
cm.log("Core", "Connection cancelled.")
cm.cleanup()
cm.setState(Disconnected)
return nil
}
// 1. Parse link
cm.log("Core", "Parsing link...")
proxyLink, err := ParseLink(link)
if err != nil {
cm.log("Core", fmt.Sprintf("Failed to parse link: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
cm.mu.Lock()
cm.CurrentServer = proxyLink
cm.mu.Unlock()
cm.log("Core", fmt.Sprintf("Server: %s", proxyLink))
// 2. Generate configs
cm.log("Core", "Generating configurations...")
xrayConfig, err := GenerateXrayConfig(proxyLink, DefaultSocksPort)
if err != nil {
cm.log("Core", fmt.Sprintf("Failed to generate xray config: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
singBoxConfig, err := GenerateSingBoxConfig(proxyLink.Address, DefaultSocksPort, "kettuTun")
if err != nil {
cm.log("Core", fmt.Sprintf("Failed to generate sing-box config: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
// 3. Save configs
cm.xrayConfigPath = filepath.Join(cm.configsPath, "xray-config.json")
cm.singBoxConfigPath = filepath.Join(cm.configsPath, "singbox-config.json")
if err := os.WriteFile(cm.xrayConfigPath, []byte(xrayConfig), 0o644); err != nil {
cm.log("Core", fmt.Sprintf("Failed to save xray config: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
if err := os.WriteFile(cm.singBoxConfigPath, []byte(singBoxConfig), 0o644); err != nil {
cm.log("Core", fmt.Sprintf("Failed to save sing-box config: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
cm.log("Core", fmt.Sprintf("Xray config saved: %s", cm.xrayConfigPath))
cm.log("Core", fmt.Sprintf("Sing-box config saved: %s", cm.singBoxConfigPath))
// 4. Start xray
cm.log("Xray", "Starting xray-core...")
xrayPath := filepath.Join(cm.coresPath, "xray.exe")
xrayCmd, err := cm.startProcess(xrayPath, []string{"run", "-config", cm.xrayConfigPath}, "Xray")
if err != nil {
cm.log("Core", fmt.Sprintf("Failed to start xray: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
cm.mu.Lock()
cm.xrayProcess = xrayCmd
cm.mu.Unlock()
// 5. Wait for SOCKS5 port
cm.log("Core", fmt.Sprintf("Waiting for SOCKS5 port %d...", DefaultSocksPort))
if err := waitForPort(ctx, DefaultSocksPort, 10*time.Second); err != nil {
cm.log("Core", fmt.Sprintf("Xray failed to open port %d: %v", DefaultSocksPort, err))
cm.cleanup()
cm.setState(Error)
return fmt.Errorf("xray failed to open port %d within timeout", DefaultSocksPort)
}
cm.log("Core", "SOCKS5 port ready.")
if ctx.Err() != nil {
cm.log("Core", "Connection cancelled.")
cm.cleanup()
cm.setState(Disconnected)
return nil
}
// 6. Start sing-box
cm.log("SingBox", "Starting sing-box (TUN)...")
singBoxPath := filepath.Join(cm.coresPath, "sing-box.exe")
singBoxCmd, err := cm.startProcess(singBoxPath, []string{"run", "-c", cm.singBoxConfigPath}, "SingBox")
if err != nil {
cm.log("Core", fmt.Sprintf("Failed to start sing-box: %v", err))
cm.cleanup()
cm.setState(Error)
return err
}
cm.mu.Lock()
cm.singBoxProcess = singBoxCmd
cm.mu.Unlock()
// 7. Wait for TUN setup
if err := sleepCtx(ctx, 2*time.Second); err != nil {
cm.log("Core", "Connection cancelled.")
cm.cleanup()
cm.setState(Disconnected)
return nil
}
// Check both processes are alive
if cm.xrayProcess.ProcessState != nil {
err := fmt.Errorf("xray exited with code %d", cm.xrayProcess.ProcessState.ExitCode())
cm.log("Core", err.Error())
cm.cleanup()
cm.setState(Error)
return err
}
if cm.singBoxProcess.ProcessState != nil {
err := fmt.Errorf("sing-box exited with code %d", cm.singBoxProcess.ProcessState.ExitCode())
cm.log("Core", err.Error())
cm.cleanup()
cm.setState(Error)
return err
}
cm.setState(Connected)
cm.log("Core", fmt.Sprintf("Connected to %s (%s:%d)", proxyLink.Remark, proxyLink.Address, proxyLink.Port))
return nil
}
// Disconnect stops the VPN connection.
func (cm *CoreManager) Disconnect() {
cm.mu.Lock()
if cm.State == Disconnected || cm.State == Disconnecting {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
cm.setState(Disconnecting)
cm.log("Core", "Disconnecting...")
cm.mu.Lock()
if cm.cancel != nil {
cm.cancel()
}
cm.mu.Unlock()
cm.cleanup()
cm.mu.Lock()
cm.CurrentServer = nil
cm.mu.Unlock()
cm.setState(Disconnected)
cm.log("Core", "Disconnected.")
}
func (cm *CoreManager) startProcess(path string, args []string, source string) (*exec.Cmd, error) {
if _, err := os.Stat(path); err != nil {
return nil, fmt.Errorf("binary not found: %s", path)
}
cmd := exec.Command(path, args...)
cmd.Dir = filepath.Dir(path)
cmd.SysProcAttr = procAttr() // platform-specific: hide window on Windows
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start %s: %w", source, err)
}
cm.log(source, fmt.Sprintf("Process started (PID: %d)", cmd.Process.Pid))
// Read stdout/stderr in background
go scanPipe(stdout, func(line string) { cm.log(source, line) })
go scanPipe(stderr, func(line string) { cm.log(source, markIfError(line)) })
// Monitor process exit
go func() {
_ = cmd.Wait()
cm.log(source, fmt.Sprintf("Process exited (code: %d)", cmd.ProcessState.ExitCode()))
cm.mu.Lock()
state := cm.State
isOurs := cmd == cm.xrayProcess || cmd == cm.singBoxProcess
cm.mu.Unlock()
if isOurs && (state == Connected || state == Connecting) {
cm.log(source, fmt.Sprintf("CRITICAL: Process %s crashed! Cleaning up...", source))
cm.setState(Error)
cm.mu.Lock()
if cm.cancel != nil {
cm.cancel()
}
cm.mu.Unlock()
}
}()
return cmd, nil
}
func waitForPort(ctx context.Context, port int, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
addr := fmt.Sprintf("127.0.0.1:%d", port)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
if err == nil {
conn.Close()
return nil
}
time.Sleep(300 * time.Millisecond)
}
}
}
func (cm *CoreManager) cleanup() {
// Stop sing-box first (TUN), then xray
cm.mu.Lock()
singBox := cm.singBoxProcess
xray := cm.xrayProcess
cm.singBoxProcess = nil
cm.xrayProcess = nil
xrayConfig := cm.xrayConfigPath
singBoxConfig := cm.singBoxConfigPath
cm.xrayConfigPath = ""
cm.singBoxConfigPath = ""
cm.mu.Unlock()
stopProcess(singBox, "SingBox", cm.log)
stopProcess(xray, "Xray", cm.log)
tryDeleteFile(xrayConfig)
tryDeleteFile(singBoxConfig)
}
func stopProcess(cmd *exec.Cmd, source string, logFn func(string, string)) {
if cmd == nil || cmd.Process == nil {
return
}
if cmd.ProcessState != nil {
return // already exited
}
logFn(source, fmt.Sprintf("Terminating process (PID: %d)...", cmd.Process.Pid))
if err := cmd.Process.Kill(); err != nil {
logFn(source, fmt.Sprintf("Error terminating process: %v", err))
return
}
logFn(source, "Process terminated.")
}
func tryDeleteFile(path string) {
if path == "" {
return
}
_ = os.Remove(path)
}
func (cm *CoreManager) killStaleProcesses() {
for _, name := range []string{"xray", "sing-box"} {
killProcessByName(name, func(pid int) {
cm.log("Core", fmt.Sprintf("Killing stale %s process (PID: %d)", name, pid))
})
}
}
func (cm *CoreManager) setState(state ConnectionState) {
cm.mu.Lock()
if cm.State == state {
cm.mu.Unlock()
return
}
cm.State = state
handler := cm.OnStateChanged
cm.mu.Unlock()
if handler != nil {
handler(state)
}
}
func (cm *CoreManager) log(source, message string) {
cm.mu.Lock()
handler := cm.OnLog
cm.mu.Unlock()
if handler != nil {
handler(source, message)
}
}
// markIfError prefixes a line with [ERR] only if it actually contains an error/fatal level.
// sing-box and xray write INFO/WARN/ERROR to stderr, so we can't blindly mark all stderr as errors.
func markIfError(line string) string {
upper := strings.ToUpper(line)
if strings.Contains(upper, "ERROR") || strings.Contains(upper, "FATAL") || strings.Contains(upper, "PANIC") {
return "[ERR] " + line
}
return line
}
func sleepCtx(ctx context.Context, d time.Duration) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d):
return nil
}
}
// Close stops all processes and cleans up.
func (cm *CoreManager) Close() {
cm.mu.Lock()
if cm.cancel != nil {
cm.cancel()
}
cm.mu.Unlock()
cm.cleanup()
}