Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

When DSPy’s programmatic optimization framework meets the Model Context Protocol (MCP), security becomes both more critical and mo

Rick Hightower

Originally published on Medium.

When DSPy’s programmatic optimization framework meets the Model Context Protocol (MCP), security becomes both more critical and mo

Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

# Define DSPy signatures for customer service tasks
class
 
CustomerServiceSignature
(dspy.Signature):
    
"""Handle customer service requests using available tools."""
    request: 
str
 = dspy.InputField(desc=
"Customer service request"
)
    response: 
str
 = dspy.OutputField(desc=
"Helpful customer service response"
)
async 
def
 
get_oauth_token
(
self
) -> 
str:
    
""
"Obtain OAuth access token using client credentials flow."
""
    current_time = time.time()
    
# Check if we have a valid token
    
if
 
self
.access_token 
and
 current_time < 
self
.token_expires_at - 
60
:
        
return
 
self
.access_token
    
# Request new token using the configured HTTP client
    response = await 
self
.http_client.post(
        
self
.oauth_config[
'token_url'
],
        data={
            
'grant_type'
: 
'client_credentials'
,
            
'client_id'
: 
self
.oauth_config[
'client_id'
],
            
'client_secret'
: 
self
.oauth_config[
'client_secret'
],
            
'scope'
: 
self
.oauth_config[
'scopes'
]
        }
    )
    
if
 response.status_code != 
200
:
        raise Exception(f
"OAuth token request failed: {response.text}"
)
    token_data = response.json()
    
self
.access_token = token_data[
'access_token'
]
    
# Calculate token expiration
    expires_in = token_data.get(
'expires_in'
, 
3600
)
    
self
.token_expires_at = current_time + expires_in
    
return
 
self
.access_token

Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

async def 
_verify_token_scopes
(
self
, 
required_scopes
: List[str]) -> 
bool
:
    
""
"Verify the current token has required scopes with proper JWT signature verification."
""
    
if
 not 
self
.access_token:
        
return
 False
    
try
:
        
# Get the OAuth server's public key for verification
        public_key_jwk = await 
self
.
get_oauth_public_key
()
        
if
 public_key_jwk:
            
# Proper JWT verification with signature check
            
try
:
                
# Convert JWK to PEM format for PyJWT
                
from
 jwt.algorithms import RSAAlgorithm
                public_key = RSAAlgorithm.
from_jwk
(public_key_jwk)
                
# Verify JWT with full signature validation
                payload = jwt.
decode
(
                    
self
.access_token,
                    key=public_key,
                    algorithms=[
"RS256"
],
                    audience=
self
.oauth_config.
get
(
'client_id'
),  # Verify audience
                    issuer=
self
.oauth_config.
get
(
'token_url'
, 
''
).
replace
(
'/token'
, 
''
)  # Verify issuer
                )
                
print
(
"✅ JWT signature verification successful"
)
            except jwt.InvalidTokenError 
as
 e:
                
print
(f
"❌ JWT signature verification failed: {e}"
)
                
return
 False
        
else
:
            
# Fallback to unverified decode if public key unavailable
            
print
(
"⚠️  Using unverified JWT decode (development only)"
)
            payload = jwt.
decode
(
                
self
.access_token,
                options={
"verify_signature"
: False}
            )
        
# Check scopes
        token_scopes = payload.get(
'scope'
, 
''
).split()
        has_required_scopes = 
all
(scope 
in
 token_scopes 
for
 scope 
in
 required_scopes)
        
if
 has_required_scopes:
            
print
(
f"✅ Token has required scopes: 
{required_scopes}
"
)
        
else
:
            
print
(
f"❌ Token missing scopes. Has: 
{token_scopes}
, Needs: 
{required_scopes}
"
)
        
return
 has_required_scopes
    
except
 Exception 
as
 e:
        
print
(
f"❌ Token verification error: 
{e}
"
)
        
return
 
False
class
 
SecureMCPTool
:
    
"""Wrapper to make MCP tools compatible with DSPy."""
    
def
 
__init__
(
self, tool, session, client
):
        self.tool = tool
        self.session = session
        self.client = client
        self.name = tool.name
        self.description = tool.description
        self.input_schema = tool.inputSchema
    
async
 
def
 
__call__
(
self, **kwargs
):
        
"""Execute the MCP tool with security validation."""
        
# Verify required scopes
        required_scopes = self.client._get_required_scopes(self.name)
        
if
 
not
 
await
 self.client._verify_token_scopes(required_scopes):
            
raise
 PermissionError(
                
f"Insufficient permissions for 
{self.name}
"
            )
        
try
:
            
# Call the MCP tool
            result = 
await
 self.session.call_tool(
                self.name,
                arguments=kwargs
            )
            
# Handle rate limit responses
            
if
 
hasattr
(result, 
'error'
) 
and
 
'rate_limit'
 
in
 
str
(result.error):
                retry_after = result.metadata.get(
'retry_after'
, 
60
)
                
print
(
f"⏳ Rate limited. Waiting 
{retry_after}
 seconds..."
)
                
await
 asyncio.sleep(retry_after)
                
# Retry the tool call
                result = 
await
 self.session.call_tool(
                    self.name,
                    arguments=kwargs
                )
            
# Extract content from result
            
if
 
hasattr
(result, 
'content'
) 
and
 result.content:
                
return
 result.content[
0
].text 
if
 result.content 
else
 
""
            
else
:
                
return
 
f"Tool 
{self.name}
 completed successfully"
        
except
 httpx.HTTPStatusError 
as
 e:
            
if
 e.response.status_code == 
401
:
                
# Token expired, refresh and retry
                self.client.access_token = 
None
                
await
 self.client.get_oauth_token()
                
return
 
await
 self.__call__(**kwargs)
            
elif
 e.response.status_code == 
429
:
                
# Handle rate limiting
                retry_after = 
int
(e.response.headers.get(
'Retry-After'
, 
60
))
                
print
(
f"⏳ Rate limited by server. Waiting 
{retry_after}
 seconds..."
)
                
await
 asyncio.sleep(retry_after)
                
return
 
await
 self.__call__(**kwargs)
            
else
:
                
raise
class
 
SecureMCPTool
:
    
"""Wrapper to make MCP tools compatible with DSPy."""
def
 
__init__
(
self, tool, session, client
):
        self.tool = tool
        self.session = session
        self.client = client
        self.name = tool.name
        self.description = tool.description
        self.input_schema = tool.inputSchema
async
 
def
 
__call__
(
self, **kwargs
):
        
"""Execute the MCP tool with security validation."""
        
# Verify required scopes
        required_scopes = self.client._get_required_scopes(self.name)
        
if
 
not
 
await
 self.client._verify_token_scopes(required_scopes):
            
raise
 PermissionError(
                
f"Insufficient permissions for 
{self.name}
"
            )
try
:
            
# Call the MCP tool
            result = 
await
 self.session.call_tool(
                self.name,
                arguments=kwargs
            )
            
# Handle rate limit responses
            
if
 
hasattr
(result, 
'error'
) 
and
 
'rate_limit'
 
in
 
str
(result.error):
                retry_after = result.metadata.get(
'retry_after'
, 
60
)
                
print
(
f"⏳ Rate limited. Waiting 
{retry_after}
 seconds..."
)
                
await
 asyncio.sleep(retry_after)
                
# Retry the tool call
                result = 
await
 self.session.call_tool(
                    self.name,
                    arguments=kwargs
                )
# Extract content from result
            
if
 
hasattr
(result, 
'content'
) 
and
 result.content:
                
return
 result.content[
0
].text 
if
 result.content 
else
 
""
            
else
:
                
return
 
f"Tool 
{self.name}
 completed successfully"
except
 httpx.HTTPStatusError 
as
 e:
            
if
 e.response.status_code == 
401
:
                
# Token expired, refresh and retry
                self.client.access_token = 
None
                
await
 self.client.get_oauth_token()
                
return
 
await
 self.__call__(**kwargs)
            
elif
 e.response.status_code == 
429
:
                
# Handle rate limiting
                retry_after = 
int
(e.response.headers.get(
'Retry-After'
, 
60
))
                
print
(
f"⏳ Rate limited by server. Waiting 
{retry_after}
 seconds..."
)
                
await
 asyncio.sleep(retry_after)
                
return
 
await
 self.__call__(**kwargs)
            
else
:
                
raise
  • If the authentication token has expired (401 error), it automatically gets a new token and tries again without bothering the user.
  • If the server indicates we’re making too many requests (429 error), it waits for the specified time before trying again.
  • For any other errors, it passes them up the chain so they can be properly handled by the calling code.
  1. Enforces permission boundaries
  2. Handles authentication issues transparently
  3. Manages rate limiting gracefully
  4. Standardizes tool output for DSPy
  5. Provides resilience through intelligent retry mechanisms

Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

def _get_required_scopes(
self
, tool_name: 
str
) 
->
 List[
str
]:
    
""
"Map tool names to required OAuth scopes."
""
    scope_mapping = {
        
"get_customer_info"
: [
"customer:read"
],
        
"create_support_ticket"
: [
"ticket:create"
],
        
"calculate_account_value"
: [
"account:calculate"
],
        
"get_recent_customers"
: [
"customer:read"
]
    }
    
return
 scope_mapping.
get
(tool_name, [])
async
 
def
 
connect_to_secure_mcp_server
(
self
):
    
"""Connect to OAuth-protected MCP server."""
    
# Get fresh access token
    access_token = 
await
 self.get_oauth_token()
    
# Create custom httpx client factory with our CA bundle
    
def
 
custom_httpx_client_factory
(
headers=
None
, timeout=
None
, auth=
None
):
        
# Get the same CA cert path we use for the main client
        ca_cert_path = self.oauth_config.get(
'ca_cert_path'
, 
None
)
        ssl_cert_file = os.environ.get(
'SSL_CERT_FILE'
)
        
if
 ssl_cert_file 
and
 os.path.exists(ssl_cert_file):
            ca_cert_path = ssl_cert_file
            
if
 os.environ.get(
'DEBUG_SSL'
):
                
print
(
f"🔐 MCP client using SSL_CERT_FILE: 
{ssl_cert_file}
"
)
        
return
 httpx.AsyncClient(
            headers=headers,
            timeout=timeout 
if
 timeout 
else
 httpx.Timeout(
30.0
),
            auth=auth,
            verify=ca_cert_path 
if
 ca_cert_path 
else
 
True
,  
# Use proper SSL verification
            follow_redirects=
True
        )
    
# Create HTTP client with authentication headers and custom SSL verification
    http_transport = 
await
 self.exit_stack.enter_async_context(
        streamablehttp_client(
            url=self.oauth_config[
'mcp_server_url'
],
            headers={
"Authorization"
: 
f"Bearer 
{access_token}
"
},
            httpx_client_factory=custom_httpx_client_factory
        )
    )
    read, write, url_getter = http_transport
    session = 
await
 self.exit_stack.enter_async_context(
        ClientSession(read, write)
    )
    
# Initialize with auth headers
    
await
 session.initialize()
    self.sessions.append(session)
    
# Discover available tools
    response = 
await
 session.list_tools()
    
for
 tool 
in
 response.tools:
        self.tool_to_session[tool.name] = session
        self.available_tools.append(tool)
async
 
def
 
process_scenarios
(
self, scenarios: 
List
[
str
]
):
    
"""Process multiple scenarios with the DSPy agent."""
    results = []
    
for
 i, scenario 
in
 
enumerate
(scenarios, 
1
):
        
print
(
f"\n📝 Scenario 
{i}
: 
{scenario}
"
)
        
try
:
            response = 
await
 self.process_request(scenario)
            
print
(
f"🤖 DSPy Agent Response: 
{response}
"
)
            results.append({
                
"scenario"
: scenario,
                
"response"
: response,
                
"status"
: 
"success"
            })
        
except
 Exception 
as
 e:
            
print
(
f"❌ Error: 
{e}
"
)
            results.append({
                
"scenario"
: scenario,
                
"error"
: 
str
(e),
                
"status"
: 
"error"
            })
        
print
(
"─"
 * 
60
)
    
return
 results

Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

async
 
def
 
setup_dspy_agent
(
self
):
    
"""Set up DSPy with secure MCP tools."""
    
# Configure DSPy with the appropriate language model
    
if
 self.llm_provider == 
"openai"
:
        llm = dspy.LM(
            
f"openai/
{Config.OPENAI_MODEL}
"
,
            api_key=self.api_key
        )
    
elif
 self.llm_provider == 
"anthropic"
:
        llm = dspy.LM(
            
f"anthropic/
{Config.ANTHROPIC_MODEL}
"
,
            api_key=self.api_key
        )
    
else
:
        
raise
 ValueError(
f"Unsupported LLM provider: 
{self.llm_provider}
"
)
    dspy.configure(lm=llm)
    
# Convert MCP tools to DSPy-compatible tools
    self.dspy_tools = []
    
for
 tool 
in
 self.available_tools:
        session = self.tool_to_session[tool.name]
        dspy_tool = SecureMCPTool(tool, session, self)
        self.dspy_tools.append(dspy_tool)
    
# Create a ReAct agent with the secure tools
    self.react_agent = dspy.ReAct(
        CustomerServiceSignature,
        tools=self.dspy_tools,
        max_iters=
5
    )
async
 def 
process_request
(
self
, request: 
str
) 
->
 
str
:
    
""
"Process a customer service request using DSPy ReAct agent."
""
    
if
 not 
self
.react_agent:
        raise 
RuntimeError
(
"Agent not initialized. Call setup_dspy_agent() first."
)
    
try
:
        # Use DSPy
's
 
async
 call method
        result = 
await
 
self
.react_agent.
acall
(request=request)
        
return
 result.response
    except Exception 
as
 e:
        
return
 f
"Error processing request: {str(e)}"
  • Mass data extraction through repeated API calls
  • DoS attacks on backend services
  • Violation of privacy regulations through bulk data access
  1. Scope Enforcement: Each tool required specific scopes, preventing unauthorized data access
  2. Rate Limiting: Built-in delays prevented API flooding
  3. Audit Logging: All tool calls were logged, making the pattern visible
  4. Token Expiration: Limited the window for any potential abuse
# Configure secure HTTP client with TLS verification
ca_cert_path = oauth_config.get(
'ca_cert_path'
, 
None
)
# Check for SSL environment variables (used by mkcert script)
ssl_cert_file = os.environ.get(
'SSL_CERT_FILE'
)
if
 ssl_cert_file 
and
 os.path.exists(ssl_cert_file):
    ca_cert_path = ssl_cert_file
    
if
 os.environ.get(
'DEBUG_SSL'
):
        
print
(
f"🔐 Using SSL_CERT_FILE: 
{ssl_cert_file}
"
)
# Configure secure HTTP client with TLS verification
self.http_client = httpx.AsyncClient(
    verify=ca_cert_path 
if
 ca_cert_path 
else
 
True
,
    timeout=
30.0
)

Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security

# Example customer service scenarios
scenarios = [
    
"For customer ABC123 create a support ticket as the bbq grill that she bought is defective."
,
    
"Check the account status for customer ABC123 and calculate their total purchase value with amounts [250.0, 175.50, 82.25]"
,
    
"Calculate account value for customer ABC123 with purchases: 
$150
.0, 
$300
.0 and 
$89
.50"
,
]
  1. Multi-tool operations: Creating tickets requires customer lookup then ticket creation
  2. Calculation with data: Tests scope boundaries between read and calculate permissions
  3. Natural language parsing: Ensures DSPy handles various input formats securely
#Securing #DSPy #MCP #Integration #Programmatic #Meets #Enterprise #Security