fix(workspace-server): http client timeouts, panic recovery, and error checks (re-created from staging #2045) #2125
@@ -61,7 +61,8 @@ func refreshEnvFromCP() error {
|
||||
req.Header.Set("Authorization", "Bearer "+adminToken)
|
||||
req.Header.Set("X-Molecule-Org-Id", orgID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("do request: %w", err)
|
||||
}
|
||||
|
||||
@@ -89,6 +89,11 @@ func Import(
|
||||
// PluginsPath set by caller if available
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("bundle/importer: PANIC during provision start for %s: %v", wsID, r)
|
||||
}
|
||||
}()
|
||||
provCtx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
|
||||
defer cancel()
|
||||
url, err := prov.Start(provCtx, cfg)
|
||||
|
||||
@@ -1035,7 +1035,12 @@ func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID
|
||||
// completed when t.Cleanup fires. Does NOT read db.DB; idle-timer
|
||||
// management only.
|
||||
go func() {
|
||||
defer unsub()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("a2a_proxy: PANIC in SSE idle watcher for %s: %v", workspaceID, r)
|
||||
}
|
||||
unsub()
|
||||
}()
|
||||
timer := time.NewTimer(idle)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
|
||||
@@ -249,7 +249,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
// parent_id-bound branch enumerates siblings, and that is already scoped to
|
||||
// one parent (one tenant).
|
||||
if parentID.Valid {
|
||||
siblings, _ := queryPeerMaps(`
|
||||
siblings, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -268,7 +268,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
// self-delegation 400 in a tight loop (#383). The `w.id != $2`
|
||||
// clause makes self-delegation-via-peer-list impossible regardless
|
||||
// of DB state.
|
||||
children, _ := queryPeerMaps(`
|
||||
children, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -281,7 +281,7 @@ func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
// propagate that corruption back to the agent as a "peer who is also
|
||||
// you" entry.
|
||||
if parentID.Valid {
|
||||
parent, _ := queryPeerMaps(`
|
||||
parent, _ := queryPeerMaps(ctx, `
|
||||
SELECT w.id, w.name, COALESCE(w.role, ''), w.tier, w.status,
|
||||
COALESCE(w.agent_card, 'null'::jsonb), COALESCE(w.url, ''),
|
||||
w.parent_id, w.active_tasks
|
||||
@@ -350,8 +350,8 @@ func filterPeersByQuery(peers []map[string]interface{}, q string) []map[string]i
|
||||
}
|
||||
|
||||
// queryPeerMaps returns clean JSON-serializable maps instead of Workspace structs.
|
||||
func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||||
rows, err := db.DB.Query(query, args...)
|
||||
func queryPeerMaps(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||||
rows, err := db.DB.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
log.Printf("queryPeerMaps error: %v", err)
|
||||
return nil, err
|
||||
|
||||
@@ -217,7 +217,12 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
|
||||
// synchronously. No db.DB access on this path.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in stdout bridge: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := resp.Reader.Read(buf)
|
||||
@@ -440,7 +445,12 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
|
||||
// goAsync-exempt (RFC internal#524 Layer 2.2): WebSocket-lifetime
|
||||
// I/O bridge; handler blocks on `done` below. No db.DB access.
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in PTY bridge: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := ptmx.Read(buf)
|
||||
@@ -463,6 +473,11 @@ func (h *TerminalHandler) handleRemoteConnect(c *gin.Context, workspaceID, insta
|
||||
// WebSocket → PTY (stdin)
|
||||
// goAsync-exempt (RFC internal#524 Layer 2.2): see above.
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Terminal: PANIC in stdin loop: %v", r)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
_, msg, rErr := conn.ReadMessage()
|
||||
if rErr != nil {
|
||||
|
||||
@@ -556,6 +556,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create workspace"})
|
||||
return
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
maxConcurrent := payload.MaxConcurrentTasks
|
||||
if maxConcurrent <= 0 {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -41,6 +42,11 @@ func NewMCPRateLimiter(rate int, interval time.Duration, ctx context.Context) *M
|
||||
interval: interval,
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("mcp_ratelimit: PANIC in bucket cleanup: %v", r)
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
@@ -3,6 +3,7 @@ package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -35,6 +36,11 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
|
||||
interval: interval,
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("ratelimit: PANIC in bucket cleanup: %v", r)
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
@@ -116,6 +116,11 @@ func sessionCachePut(key string, ok bool) {
|
||||
|
||||
func init() {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("session_auth: PANIC in cache sweeper: %v", r)
|
||||
}
|
||||
}()
|
||||
// Jitter startup so restarts don't align sweeps.
|
||||
time.Sleep(time.Duration(rand.Int64N(int64(sessionCacheSweepEvery))))
|
||||
t := time.NewTicker(sessionCacheSweepEvery)
|
||||
|
||||
Reference in New Issue
Block a user