Securing DSPy's MCP Integration: Programmatic AI Meets Enterprise Security
When DSPy’s programmatic optimization framework meets the Model Context Protocol (MCP), security becomes both more critical and mo
Rick Hightower
Originally published on Medium.
When DSPy’s programmatic optimization framework meets the Model Context Protocol (MCP), security becomes both more critical and mo


# Define DSPy signatures for customer service tasks
class
CustomerServiceSignature
(dspy.Signature):
"""Handle customer service requests using available tools."""
request:
str
= dspy.InputField(desc=
"Customer service request"
)
response:
str
= dspy.OutputField(desc=
"Helpful customer service response"
)
async
def
get_oauth_token
(
self
) ->
str:
""
"Obtain OAuth access token using client credentials flow."
""
current_time = time.time()
# Check if we have a valid token
if
self
.access_token
and
current_time <
self
.token_expires_at -
60
:
return
self
.access_token
# Request new token using the configured HTTP client
response = await
self
.http_client.post(
self
.oauth_config[
'token_url'
],
data={
'grant_type'
:
'client_credentials'
,
'client_id'
:
self
.oauth_config[
'client_id'
],
'client_secret'
:
self
.oauth_config[
'client_secret'
],
'scope'
:
self
.oauth_config[
'scopes'
]
}
)
if
response.status_code !=
200
:
raise Exception(f
"OAuth token request failed: {response.text}"
)
token_data = response.json()
self
.access_token = token_data[
'access_token'
]
# Calculate token expiration
expires_in = token_data.get(
'expires_in'
,
3600
)
self
.token_expires_at = current_time + expires_in
return
self
.access_token

async def
_verify_token_scopes
(
self
,
required_scopes
: List[str]) ->
bool
:
""
"Verify the current token has required scopes with proper JWT signature verification."
""
if
not
self
.access_token:
return
False
try
:
# Get the OAuth server's public key for verification
public_key_jwk = await
self
.
get_oauth_public_key
()
if
public_key_jwk:
# Proper JWT verification with signature check
try
:
# Convert JWK to PEM format for PyJWT
from
jwt.algorithms import RSAAlgorithm
public_key = RSAAlgorithm.
from_jwk
(public_key_jwk)
# Verify JWT with full signature validation
payload = jwt.
decode
(
self
.access_token,
key=public_key,
algorithms=[
"RS256"
],
audience=
self
.oauth_config.
get
(
'client_id'
), # Verify audience
issuer=
self
.oauth_config.
get
(
'token_url'
,
''
).
replace
(
'/token'
,
''
) # Verify issuer
)
print
(
"✅ JWT signature verification successful"
)
except jwt.InvalidTokenError
as
e:
print
(f
"❌ JWT signature verification failed: {e}"
)
return
False
else
:
# Fallback to unverified decode if public key unavailable
print
(
"⚠️ Using unverified JWT decode (development only)"
)
payload = jwt.
decode
(
self
.access_token,
options={
"verify_signature"
: False}
)
# Check scopes
token_scopes = payload.get(
'scope'
,
''
).split()
has_required_scopes =
all
(scope
in
token_scopes
for
scope
in
required_scopes)
if
has_required_scopes:
print
(
f"✅ Token has required scopes:
{required_scopes}
"
)
else
:
print
(
f"❌ Token missing scopes. Has:
{token_scopes}
, Needs:
{required_scopes}
"
)
return
has_required_scopes
except
Exception
as
e:
print
(
f"❌ Token verification error:
{e}
"
)
return
False
class
SecureMCPTool
:
"""Wrapper to make MCP tools compatible with DSPy."""
def
__init__
(
self, tool, session, client
):
self.tool = tool
self.session = session
self.client = client
self.name = tool.name
self.description = tool.description
self.input_schema = tool.inputSchema
async
def
__call__
(
self, **kwargs
):
"""Execute the MCP tool with security validation."""
# Verify required scopes
required_scopes = self.client._get_required_scopes(self.name)
if
not
await
self.client._verify_token_scopes(required_scopes):
raise
PermissionError(
f"Insufficient permissions for
{self.name}
"
)
try
:
# Call the MCP tool
result =
await
self.session.call_tool(
self.name,
arguments=kwargs
)
# Handle rate limit responses
if
hasattr
(result,
'error'
)
and
'rate_limit'
in
str
(result.error):
retry_after = result.metadata.get(
'retry_after'
,
60
)
print
(
f"⏳ Rate limited. Waiting
{retry_after}
seconds..."
)
await
asyncio.sleep(retry_after)
# Retry the tool call
result =
await
self.session.call_tool(
self.name,
arguments=kwargs
)
# Extract content from result
if
hasattr
(result,
'content'
)
and
result.content:
return
result.content[
0
].text
if
result.content
else
""
else
:
return
f"Tool
{self.name}
completed successfully"
except
httpx.HTTPStatusError
as
e:
if
e.response.status_code ==
401
:
# Token expired, refresh and retry
self.client.access_token =
None
await
self.client.get_oauth_token()
return
await
self.__call__(**kwargs)
elif
e.response.status_code ==
429
:
# Handle rate limiting
retry_after =
int
(e.response.headers.get(
'Retry-After'
,
60
))
print
(
f"⏳ Rate limited by server. Waiting
{retry_after}
seconds..."
)
await
asyncio.sleep(retry_after)
return
await
self.__call__(**kwargs)
else
:
raise
class
SecureMCPTool
:
"""Wrapper to make MCP tools compatible with DSPy."""
def
__init__
(
self, tool, session, client
):
self.tool = tool
self.session = session
self.client = client
self.name = tool.name
self.description = tool.description
self.input_schema = tool.inputSchema
async
def
__call__
(
self, **kwargs
):
"""Execute the MCP tool with security validation."""
# Verify required scopes
required_scopes = self.client._get_required_scopes(self.name)
if
not
await
self.client._verify_token_scopes(required_scopes):
raise
PermissionError(
f"Insufficient permissions for
{self.name}
"
)
try
:
# Call the MCP tool
result =
await
self.session.call_tool(
self.name,
arguments=kwargs
)
# Handle rate limit responses
if
hasattr
(result,
'error'
)
and
'rate_limit'
in
str
(result.error):
retry_after = result.metadata.get(
'retry_after'
,
60
)
print
(
f"⏳ Rate limited. Waiting
{retry_after}
seconds..."
)
await
asyncio.sleep(retry_after)
# Retry the tool call
result =
await
self.session.call_tool(
self.name,
arguments=kwargs
)
# Extract content from result
if
hasattr
(result,
'content'
)
and
result.content:
return
result.content[
0
].text
if
result.content
else
""
else
:
return
f"Tool
{self.name}
completed successfully"
except
httpx.HTTPStatusError
as
e:
if
e.response.status_code ==
401
:
# Token expired, refresh and retry
self.client.access_token =
None
await
self.client.get_oauth_token()
return
await
self.__call__(**kwargs)
elif
e.response.status_code ==
429
:
# Handle rate limiting
retry_after =
int
(e.response.headers.get(
'Retry-After'
,
60
))
print
(
f"⏳ Rate limited by server. Waiting
{retry_after}
seconds..."
)
await
asyncio.sleep(retry_after)
return
await
self.__call__(**kwargs)
else
:
raise
- If the authentication token has expired (401 error), it automatically gets a new token and tries again without bothering the user.
- If the server indicates we’re making too many requests (429 error), it waits for the specified time before trying again.
- For any other errors, it passes them up the chain so they can be properly handled by the calling code.
- Enforces permission boundaries
- Handles authentication issues transparently
- Manages rate limiting gracefully
- Standardizes tool output for DSPy
- Provides resilience through intelligent retry mechanisms

def _get_required_scopes(
self
, tool_name:
str
)
->
List[
str
]:
""
"Map tool names to required OAuth scopes."
""
scope_mapping = {
"get_customer_info"
: [
"customer:read"
],
"create_support_ticket"
: [
"ticket:create"
],
"calculate_account_value"
: [
"account:calculate"
],
"get_recent_customers"
: [
"customer:read"
]
}
return
scope_mapping.
get
(tool_name, [])
async
def
connect_to_secure_mcp_server
(
self
):
"""Connect to OAuth-protected MCP server."""
# Get fresh access token
access_token =
await
self.get_oauth_token()
# Create custom httpx client factory with our CA bundle
def
custom_httpx_client_factory
(
headers=
None
, timeout=
None
, auth=
None
):
# Get the same CA cert path we use for the main client
ca_cert_path = self.oauth_config.get(
'ca_cert_path'
,
None
)
ssl_cert_file = os.environ.get(
'SSL_CERT_FILE'
)
if
ssl_cert_file
and
os.path.exists(ssl_cert_file):
ca_cert_path = ssl_cert_file
if
os.environ.get(
'DEBUG_SSL'
):
print
(
f"🔐 MCP client using SSL_CERT_FILE:
{ssl_cert_file}
"
)
return
httpx.AsyncClient(
headers=headers,
timeout=timeout
if
timeout
else
httpx.Timeout(
30.0
),
auth=auth,
verify=ca_cert_path
if
ca_cert_path
else
True
,
# Use proper SSL verification
follow_redirects=
True
)
# Create HTTP client with authentication headers and custom SSL verification
http_transport =
await
self.exit_stack.enter_async_context(
streamablehttp_client(
url=self.oauth_config[
'mcp_server_url'
],
headers={
"Authorization"
:
f"Bearer
{access_token}
"
},
httpx_client_factory=custom_httpx_client_factory
)
)
read, write, url_getter = http_transport
session =
await
self.exit_stack.enter_async_context(
ClientSession(read, write)
)
# Initialize with auth headers
await
session.initialize()
self.sessions.append(session)
# Discover available tools
response =
await
session.list_tools()
for
tool
in
response.tools:
self.tool_to_session[tool.name] = session
self.available_tools.append(tool)
async
def
process_scenarios
(
self, scenarios:
List
[
str
]
):
"""Process multiple scenarios with the DSPy agent."""
results = []
for
i, scenario
in
enumerate
(scenarios,
1
):
print
(
f"\n📝 Scenario
{i}
:
{scenario}
"
)
try
:
response =
await
self.process_request(scenario)
print
(
f"🤖 DSPy Agent Response:
{response}
"
)
results.append({
"scenario"
: scenario,
"response"
: response,
"status"
:
"success"
})
except
Exception
as
e:
print
(
f"❌ Error:
{e}
"
)
results.append({
"scenario"
: scenario,
"error"
:
str
(e),
"status"
:
"error"
})
print
(
"─"
*
60
)
return
results

async
def
setup_dspy_agent
(
self
):
"""Set up DSPy with secure MCP tools."""
# Configure DSPy with the appropriate language model
if
self.llm_provider ==
"openai"
:
llm = dspy.LM(
f"openai/
{Config.OPENAI_MODEL}
"
,
api_key=self.api_key
)
elif
self.llm_provider ==
"anthropic"
:
llm = dspy.LM(
f"anthropic/
{Config.ANTHROPIC_MODEL}
"
,
api_key=self.api_key
)
else
:
raise
ValueError(
f"Unsupported LLM provider:
{self.llm_provider}
"
)
dspy.configure(lm=llm)
# Convert MCP tools to DSPy-compatible tools
self.dspy_tools = []
for
tool
in
self.available_tools:
session = self.tool_to_session[tool.name]
dspy_tool = SecureMCPTool(tool, session, self)
self.dspy_tools.append(dspy_tool)
# Create a ReAct agent with the secure tools
self.react_agent = dspy.ReAct(
CustomerServiceSignature,
tools=self.dspy_tools,
max_iters=
5
)
async
def
process_request
(
self
, request:
str
)
->
str
:
""
"Process a customer service request using DSPy ReAct agent."
""
if
not
self
.react_agent:
raise
RuntimeError
(
"Agent not initialized. Call setup_dspy_agent() first."
)
try
:
# Use DSPy
's
async
call method
result =
await
self
.react_agent.
acall
(request=request)
return
result.response
except Exception
as
e:
return
f
"Error processing request: {str(e)}"
- Mass data extraction through repeated API calls
- DoS attacks on backend services
- Violation of privacy regulations through bulk data access
- Scope Enforcement: Each tool required specific scopes, preventing unauthorized data access
- Rate Limiting: Built-in delays prevented API flooding
- Audit Logging: All tool calls were logged, making the pattern visible
- Token Expiration: Limited the window for any potential abuse
# Configure secure HTTP client with TLS verification
ca_cert_path = oauth_config.get(
'ca_cert_path'
,
None
)
# Check for SSL environment variables (used by mkcert script)
ssl_cert_file = os.environ.get(
'SSL_CERT_FILE'
)
if
ssl_cert_file
and
os.path.exists(ssl_cert_file):
ca_cert_path = ssl_cert_file
if
os.environ.get(
'DEBUG_SSL'
):
print
(
f"🔐 Using SSL_CERT_FILE:
{ssl_cert_file}
"
)
# Configure secure HTTP client with TLS verification
self.http_client = httpx.AsyncClient(
verify=ca_cert_path
if
ca_cert_path
else
True
,
timeout=
30.0
)

# Example customer service scenarios
scenarios = [
"For customer ABC123 create a support ticket as the bbq grill that she bought is defective."
,
"Check the account status for customer ABC123 and calculate their total purchase value with amounts [250.0, 175.50, 82.25]"
,
"Calculate account value for customer ABC123 with purchases:
$150
.0,
$300
.0 and
$89
.50"
,
]
- Multi-tool operations: Creating tickets requires customer lookup then ticket creation
- Calculation with data: Tests scope boundaries between read and calculate permissions
- Natural language parsing: Ensures DSPy handles various input formats securely