unified_sink_detect¶
Detect dangerous sinks (SQL execution, command execution, file operations, etc.) across multiple programming languages with confidence scoring.
Quick Reference¶
unified_sink_detect(
code: str, # Code to analyze
language: str, # Programming language
confidence_threshold: float = 0.7 # Minimum confidence
) -> SinkDetectionResult
User Stories¶
| Persona | Story | Tool Value |
|---|---|---|
| 🛡️ Marcus (Security Engineer) | "Find dangerous sinks (eval, exec, system) across Python/JS/TS/Java" | Polyglot sink detection |
| 🔧 Chris (OSS Contributor) | "Detect dangerous operations consistently across multiple languages" | Unified security analysis |
| 👥 David (Team Lead) | "Identify risky code patterns across the entire stack" | Full-stack security |
Parameters¶
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
code | string | Yes | - | Source code to analyze |
language | string | Yes | - | python, javascript, typescript, java |
confidence_threshold | float | No | 0.7 | Minimum confidence (0.0-1.0) |
Supported Languages¶
| Language | Sink Categories |
|---|---|
| Python | SQL, Command, File, Network, Template, Crypto |
| JavaScript | SQL, Command, File, XSS, Network, Eval |
| TypeScript | SQL, Command, File, XSS, Network, Eval |
| Java | SQL, Command, File, LDAP, XML, Crypto |
Response Schema¶
{
"data": {
"sinks": [
{
"type": "string",
"category": "string",
"line": "integer",
"column": "integer",
"code": "string",
"function_name": "string",
"confidence": "float",
"cwe": "string",
"description": "string"
}
],
"summary": {
"total_sinks": "integer",
"by_category": {
"sql": "integer",
"command": "integer",
"file": "integer",
"xss": "integer"
},
"high_confidence": "integer",
"medium_confidence": "integer"
}
},
"tier_applied": "string",
"duration_ms": "integer"
}
Sink Categories¶
| Category | Sink Types | Risk |
|---|---|---|
sql | execute, query, raw SQL | SQL Injection |
command | system, exec, spawn | Command Injection |
file | open, readFile, writeFile | Path Traversal |
xss | innerHTML, document.write | Cross-Site Scripting |
template | render_template_string | Template Injection |
eval | eval, exec, Function() | Code Injection |
ldap | search, bind | LDAP Injection |
xml | parse, parseString | XXE |
network | request, fetch | SSRF |
Examples¶
Detect Python Sinks¶
```bash codescalpel unified-sink-detect \ --code "import os
import sqlite3
def process(user_input): os.system(f'echo {user_input}') conn = sqlite3.connect('db.sqlite') conn.execute(f'SELECT * FROM users WHERE id = {user_input}') open(f'/tmp/{user_input}', 'r')" \ --language python ```
{
"data": {
"sinks": [
{
"type": "os.system",
"category": "command",
"line": 5,
"code": "os.system(f'echo {user_input}')",
"function_name": "process",
"confidence": 0.95,
"cwe": "CWE-78",
"description": "Command execution sink - potential command injection"
},
{
"type": "sqlite3.execute",
"category": "sql",
"line": 7,
"code": "conn.execute(f'SELECT * FROM users WHERE id = {user_input}')",
"confidence": 0.95,
"cwe": "CWE-89",
"description": "SQL execution sink - potential SQL injection"
},
{
"type": "open",
"category": "file",
"line": 8,
"code": "open(f'/tmp/{user_input}', 'r')",
"confidence": 0.85,
"cwe": "CWE-22",
"description": "File operation sink - potential path traversal"
}
],
"summary": {
"total_sinks": 3,
"by_category": {"command": 1, "sql": 1, "file": 1},
"high_confidence": 3
}
},
"tier_applied": "community",
"duration_ms": 45
}
Detect JavaScript Sinks¶
{
"code": "const { exec } = require('child_process');\nconst mysql = require('mysql');\n\nfunction handleRequest(userInput) {\n exec(`ls ${userInput}`);\n document.innerHTML = userInput;\n connection.query(`SELECT * FROM users WHERE name = '${userInput}'`);\n eval(userInput);\n}",
"language": "javascript"
}
```bash codescalpel unified-sink-detect \ --code "const { exec } = require('child_process');
const mysql = require('mysql');
function handleRequest(userInput) { exec(`ls ${userInput}`); document.innerHTML = userInput; connection.query(`SELECT * FROM users WHERE name = '${userInput}'`); eval(userInput); }" \ --language javascript ```
{
"data": {
"sinks": [
{
"type": "exec",
"category": "command",
"line": 5,
"confidence": 0.95,
"cwe": "CWE-78"
},
{
"type": "innerHTML",
"category": "xss",
"line": 6,
"confidence": 0.90,
"cwe": "CWE-79"
},
{
"type": "mysql.query",
"category": "sql",
"line": 7,
"confidence": 0.95,
"cwe": "CWE-89"
},
{
"type": "eval",
"category": "eval",
"line": 8,
"confidence": 0.98,
"cwe": "CWE-94"
}
],
"summary": {
"total_sinks": 4,
"by_category": {"command": 1, "xss": 1, "sql": 1, "eval": 1}
}
}
}
Detect Java Sinks¶
{
"code": "import java.sql.*;\nimport java.io.*;\n\npublic class Handler {\n public void process(String input) {\n Runtime.getRuntime().exec(\"cmd \" + input);\n Statement stmt = conn.createStatement();\n stmt.executeQuery(\"SELECT * FROM users WHERE id = \" + input);\n new FileInputStream(input);\n }\n}",
"language": "java"
}
```bash codescalpel unified-sink-detect \ --code "import java.sql.*;
import java.io.*;
public class Handler { public void process(String input) { Runtime.getRuntime().exec(\"cmd \" + input); Statement stmt = conn.createStatement(); stmt.executeQuery(\"SELECT * FROM users WHERE id = \" + input); new FileInputStream(input); } }" \ --language java ```
{
"data": {
"sinks": [
{
"type": "Runtime.exec",
"category": "command",
"line": 6,
"confidence": 0.95,
"cwe": "CWE-78"
},
{
"type": "Statement.executeQuery",
"category": "sql",
"line": 8,
"confidence": 0.95,
"cwe": "CWE-89"
},
{
"type": "FileInputStream",
"category": "file",
"line": 9,
"confidence": 0.85,
"cwe": "CWE-22"
}
]
}
}
With Lower Threshold¶
Confidence Levels¶
| Score | Meaning | Example |
|---|---|---|
| 0.9-1.0 | Definite sink | os.system(), eval() |
| 0.7-0.9 | Very likely sink | subprocess.call(), open() |
| 0.5-0.7 | Possible sink | requests.get(), json.loads() |
| <0.5 | Unlikely sink | Common utility functions |
Tier Limits¶
unified_sink_detect capabilities vary by tier:
| Feature | Community | Pro | Enterprise |
|---|---|---|---|
| Supported languages | Python, JS, TS, Java | Python, JS, TS, Java | Python, JS, TS, Java |
| Max sinks returned | 50 | Unlimited | Unlimited |
| Basic sink detection | ✅ | ✅ | ✅ |
| Confidence scoring | ✅ | ✅ | ✅ |
| Custom sinks | ❌ | ✅ | ✅ |
| Framework-aware | ❌ | ✅ Django, Flask, Express | ✅ All frameworks |
| Sink suppression | ❌ | ❌ | ✅ Rule-based |
| Context analysis | Basic | ✅ Enhanced | ✅ Full |
Community Tier¶
- ✅ Detect dangerous sinks across Python, JavaScript, TypeScript, Java
- ✅ Confidence scoring for each sink
- ✅ Standard sink categories (SQL, command, file, XSS, eval, etc.)
- ✅ CWE mapping for each sink type
- ⚠️ Limited to 50 sinks - May truncate results for large files
- ❌ No custom sink definitions
- ❌ No framework-specific detection
- ❌ No sink suppression rules
Pro Tier¶
- ✅ All Community features
- ✅ Unlimited sinks - Complete detection list
- ✅ Custom sink definitions - Add organization-specific sinks
- ✅ Framework-aware detection - Understand Django, Flask, Express patterns
- ✅ Enhanced context analysis - Better false positive reduction
- ✅ Sink categorization - Group sinks by risk level
Enterprise Tier¶
- ✅ All Pro features
- ✅ All framework support - Not just Django/Flask/Express
- ✅ Rule-based sink suppression - Ignore known-safe patterns
- ✅ Full context analysis - Understand complete execution context
- ✅ Integration with SAST tools - Export to security platforms
- ✅ Custom confidence models - Train on your codebase
Key Difference: Sink Coverage and Customization - Community: 50 sinks, standard categories - Basic detection - Pro: Unlimited, custom sinks, framework-aware - Production scanning - Enterprise: Unlimited, full customization, suppression - Enterprise security
Use Cases¶
1. Quick Security Audit¶
# Scan a file for dangerous operations
result = unified_sink_detect(
code=file_contents,
language="python"
)
for sink in result.sinks:
print(f"Line {sink.line}: {sink.type} ({sink.cwe})")
2. Pre-Commit Check¶
# Check code before committing
if result.summary.total_sinks > 0:
print("Warning: Dangerous sinks detected")
# Block commit or require review
3. Polyglot Projects¶
# Scan mixed codebase
for file in project_files:
lang = detect_language(file)
result = unified_sink_detect(code=file.read(), language=lang)
Best Practices¶
- Use with security_scan - Sinks need sources for full analysis
- Review all sinks - Not all sinks are vulnerabilities
- Lower threshold for audits - Catch more potential issues
- Framework context matters - Some sinks are safe in frameworks
- Combine with taint tracking - Sink alone ≠ vulnerability
Related Tools¶
- security_scan - Full taint analysis
- cross_file_security_scan - Multi-file analysis