Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions components/backend/handlers/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ func HandleOAuth2Callback(c *gin.Context) {
callbackData.ExpiresIn = tokenData.ExpiresIn
callbackData.TokenType = tokenData.TokenType

// Fetch user email from provider (if supported)
userEmail := ""
if provider == "google" {
email, err := fetchGoogleUserEmail(c.Request.Context(), tokenData.AccessToken)
if err != nil {
log.Printf("Warning: Failed to fetch user email from Google: %v", err)
} else {
userEmail = email
log.Printf("Fetched user email from Google OAuth: %s", userEmail)
}
}

// Parse and validate session context from signed state parameter
stateData, err := validateAndParseOAuthState(state)
if err != nil {
Expand All @@ -319,6 +331,7 @@ func HandleOAuth2Callback(c *gin.Context) {
tokenData.AccessToken,
tokenData.RefreshToken,
tokenData.ExpiresIn,
userEmail,
)
if err != nil {
log.Printf("Failed to store credentials in Secret: %v", err)
Expand Down Expand Up @@ -397,6 +410,45 @@ func exchangeOAuthCode(ctx context.Context, provider *OAuthProvider, code string
return &tokenResp, nil
}

// GoogleUserInfo represents the minimal user info response from Google (email only)
type GoogleUserInfo struct {
Email string `json:"email"`
VerifiedEmail bool `json:"verified_email"`
}

// fetchGoogleUserEmail fetches the user's email from Google's userinfo endpoint
func fetchGoogleUserEmail(ctx context.Context, accessToken string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://www.googleapis.com/oauth2/v2/userinfo", nil)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", accessToken))

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to fetch user info: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("userinfo request failed with status %d: %s", resp.StatusCode, string(body))
}

var userInfo GoogleUserInfo
if err := json.NewDecoder(resp.Body).Decode(&userInfo); err != nil {
return "", fmt.Errorf("failed to decode user info: %w", err)
}

if userInfo.Email == "" {
return "", fmt.Errorf("no email in user info response")
}

return userInfo.Email, nil
}

// storeOAuthCallback stores OAuth callback data in a Secret for retrieval by MCP or other consumers
func storeOAuthCallback(ctx context.Context, state string, data *OAuthCallbackData) error {
if state == "" {
Expand Down Expand Up @@ -662,7 +714,7 @@ func validateAndParseOAuthState(state string) (*OAuthStateData, error) {
// Secret name: {sessionName}-{provider}-oauth (e.g., agentic-session-123-google-oauth)
// This allows the session pod to mount or read the credentials from its own namespace
// The Secret is owned by the AgenticSession CR, so it's automatically deleted when the session is deleted
func storeCredentialsInSecret(ctx context.Context, projectName, sessionName, provider, accessToken, refreshToken string, expiresIn int64) error {
func storeCredentialsInSecret(ctx context.Context, projectName, sessionName, provider, accessToken, refreshToken string, expiresIn int64, userEmail string) error {
secretName := fmt.Sprintf("%s-%s-oauth", sessionName, provider)

// Get OAuth provider config for client_id and client_secret
Expand Down Expand Up @@ -733,6 +785,11 @@ func storeCredentialsInSecret(ctx context.Context, projectName, sessionName, pro
},
}

// Add user email to Secret data if available
if userEmail != "" {
secret.Data["user_email"] = []byte(userEmail)
}

// Try to create the Secret
_, err = K8sClient.CoreV1().Secrets(projectName).Create(ctx, secret, v1.CreateOptions{})
if err != nil {
Expand All @@ -742,12 +799,12 @@ func storeCredentialsInSecret(ctx context.Context, projectName, sessionName, pro
if err != nil {
return fmt.Errorf("failed to update Secret %s/%s: %w", projectName, secretName, err)
}
log.Printf("✓ Updated OAuth credentials Secret %s/%s", projectName, secretName)
log.Printf("✓ Updated OAuth credentials Secret %s/%s (email: %s)", projectName, secretName, userEmail)
} else {
return fmt.Errorf("failed to create Secret %s/%s: %w", projectName, secretName, err)
}
} else {
log.Printf("✓ Created OAuth credentials Secret %s/%s", projectName, secretName)
log.Printf("✓ Created OAuth credentials Secret %s/%s (email: %s)", projectName, secretName, userEmail)
}

return nil
Expand Down
14 changes: 14 additions & 0 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,19 @@ func CreateSession(c *gin.Context) {
displayName = s
}
}

// Extract email from authenticated user
email := ""
if v, ok := c.Get("userEmail"); ok {
if s, ok2 := v.(string); ok2 {
email = strings.TrimSpace(s)
}
}
// Fallback to userID if no explicit email (userID is often the email)
if email == "" {
email = uid
}

groups := []string{}
if v, ok := c.Get("userGroups"); ok {
if gg, ok2 := v.([]string); ok2 {
Expand All @@ -679,6 +692,7 @@ func CreateSession(c *gin.Context) {
session["spec"].(map[string]interface{})["userContext"] = map[string]interface{}{
"userId": uid,
"displayName": displayName,
"email": email,
"groups": groups,
}
}
Expand Down
1 change: 1 addition & 0 deletions components/backend/types/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type GitRepository struct {
type UserContext struct {
UserID string `json:"userId" binding:"required"`
DisplayName string `json:"displayName" binding:"required"`
Email string `json:"email,omitempty"`
Groups []string `json:"groups" binding:"required"`
}

Expand Down
31 changes: 30 additions & 1 deletion components/operator/internal/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,15 +982,19 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
// Extract userContext for observability and auditing
userID := ""
userName := ""
userEmail := ""
if userContext, found, _ := unstructured.NestedMap(spec, "userContext"); found {
if v, ok := userContext["userId"].(string); ok {
userID = strings.TrimSpace(v)
}
if v, ok := userContext["displayName"].(string); ok {
userName = strings.TrimSpace(v)
}
if v, ok := userContext["email"].(string); ok {
userEmail = strings.TrimSpace(v)
}
}
log.Printf("Session %s initiated by user: %s (userId: %s)", name, userName, userID)
log.Printf("Session %s initiated by user: %s (userId: %s, email: %s)", name, userName, userID, userEmail)

// Create the Job
job := &batchv1.Job{
Expand Down Expand Up @@ -1108,6 +1112,29 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
},

Env: func() []corev1.EnvVar {
// Determine Google email priority:
// 1. From OAuth secret (if user completed OAuth flow)
// 2. From session userContext (if provided at creation)
// 3. Default to user@example.com
googleEmail := "user@example.com"

// Try to read email from OAuth secret first
googleOAuthSecretName := fmt.Sprintf("%s-google-oauth", name)
if oauthSecret, err := config.K8sClient.CoreV1().Secrets(sessionNamespace).Get(context.TODO(), googleOAuthSecretName, v1.GetOptions{}); err == nil {
if oauthSecret.Data != nil {
if emailBytes, ok := oauthSecret.Data["user_email"]; ok && len(emailBytes) > 0 {
googleEmail = string(emailBytes)
log.Printf("Using Google email from OAuth secret for session %s: %s", name, googleEmail)
}
}
}

// Fallback to session userContext email if OAuth email not available
if googleEmail == "user@example.com" && userEmail != "" {
googleEmail = userEmail
log.Printf("Using Google email from session userContext for session %s: %s", name, googleEmail)
}

base := []corev1.EnvVar{
{Name: "DEBUG", Value: "true"},
{Name: "INTERACTIVE", Value: fmt.Sprintf("%t", interactive)},
Expand All @@ -1122,6 +1149,8 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
// Google OAuth client credentials for workspace-mcp
{Name: "GOOGLE_OAUTH_CLIENT_ID", Value: os.Getenv("GOOGLE_OAUTH_CLIENT_ID")},
{Name: "GOOGLE_OAUTH_CLIENT_SECRET", Value: os.Getenv("GOOGLE_OAUTH_CLIENT_SECRET")},
// User email for Google Workspace MCP tools (defaults to user@example.com)
{Name: "USER_GOOGLE_EMAIL", Value: googleEmail},
}

// Add user context for observability and auditing (Langfuse userId, logs, etc.)
Expand Down
3 changes: 2 additions & 1 deletion components/runners/claude-code-runner/.mcp.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"GOOGLE_MCP_CREDENTIALS_DIR": "${GOOGLE_MCP_CREDENTIALS_DIR}",
"MCP_SINGLE_USER_MODE": "1",
"GOOGLE_OAUTH_CLIENT_ID": "${GOOGLE_OAUTH_CLIENT_ID}",
"GOOGLE_OAUTH_CLIENT_SECRET": "${GOOGLE_OAUTH_CLIENT_SECRET}"
"GOOGLE_OAUTH_CLIENT_SECRET": "${GOOGLE_OAUTH_CLIENT_SECRET}",
"USER_GOOGLE_EMAIL": "${USER_GOOGLE_EMAIL}"
}
}
}
Expand Down
64 changes: 57 additions & 7 deletions components/runners/claude-code-runner/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ async def initialize(self, context: RunnerContext):
self.context = context
logger.info(f"Initialized Claude Code adapter for session {context.session_id}")

# Set default USER_GOOGLE_EMAIL if not provided
if not os.getenv("USER_GOOGLE_EMAIL"):
os.environ["USER_GOOGLE_EMAIL"] = "user@example.com"
logger.info("USER_GOOGLE_EMAIL not set, using default: user@example.com")
else:
logger.info(f"USER_GOOGLE_EMAIL set to: {os.getenv('USER_GOOGLE_EMAIL')}")

# Copy Google OAuth credentials from mounted Secret to writable workspace location
await self._setup_google_credentials()

Expand Down Expand Up @@ -1520,19 +1527,21 @@ async def _try_copy_google_credentials(self) -> bool:

async def refresh_google_credentials(self) -> bool:
"""Check for and copy new Google OAuth credentials.

Call this method periodically (e.g., before processing a message) to detect
when a user completes the OAuth flow and credentials become available.

Kubernetes automatically updates the mounted secret volume when the secret
changes (typically within ~60 seconds), so this will pick up new credentials
without requiring a pod restart.


Also updates USER_GOOGLE_EMAIL environment variable from credentials if available.

Returns:
True if new credentials were found and copied, False otherwise.
"""
dest_path = Path("/workspace/.google_workspace_mcp/credentials/credentials.json")

# If we already have credentials in workspace, check if source is newer
if dest_path.exists():
secret_path = Path("/app/.google_workspace_mcp/credentials/credentials.json")
Expand All @@ -1541,13 +1550,54 @@ async def refresh_google_credentials(self) -> bool:
# Compare modification times - secret mount updates when K8s syncs
if secret_path.stat().st_mtime > dest_path.stat().st_mtime:
logging.info("Detected updated Google OAuth credentials, refreshing...")
return await self._try_copy_google_credentials()
if await self._try_copy_google_credentials():
# Update USER_GOOGLE_EMAIL from the new credentials
self._update_email_from_credentials(dest_path)
return True
except OSError:
pass
# Always try to update email even if file didn't change (in case env var is still default)
self._update_email_from_credentials(dest_path)
return False

# No credentials yet, try to copy
if await self._try_copy_google_credentials():
logging.info("✓ Google OAuth credentials now available (user completed authentication)")
# Update USER_GOOGLE_EMAIL from the new credentials
self._update_email_from_credentials(dest_path)
return True
return False
return False

def _update_email_from_credentials(self, creds_path: Path):
"""Read user email from credentials file and update USER_GOOGLE_EMAIL env var.

The email is extracted from the OAuth credentials stored by the backend.
"""
try:
# First check if there's a user_email file in the secret mount
secret_email_path = Path("/app/.google_workspace_mcp/credentials/user_email")
if secret_email_path.exists():
email = secret_email_path.read_text().strip()
if email and email != "user@example.com":
current_email = os.getenv("USER_GOOGLE_EMAIL", "")
if current_email != email:
os.environ["USER_GOOGLE_EMAIL"] = email
logging.info(f"Updated USER_GOOGLE_EMAIL from secret: {email}")
return

# Fallback: try to parse email from credentials.json (some OAuth providers include it)
if creds_path.exists() and creds_path.stat().st_size > 0:
try:
with open(creds_path, 'r') as f:
creds = _json.load(f)
# Some OAuth responses include email in the credentials
email = creds.get('email') or creds.get('user_email')
if email and email != "user@example.com":
current_email = os.getenv("USER_GOOGLE_EMAIL", "")
if current_email != email:
os.environ["USER_GOOGLE_EMAIL"] = email
logging.info(f"Updated USER_GOOGLE_EMAIL from credentials.json: {email}")
except (_json.JSONDecodeError, KeyError):
pass
except Exception as e:
logging.debug(f"Could not update USER_GOOGLE_EMAIL from credentials: {e}")
Loading