Files

1675 lines
50 KiB
Python

"""
MCP Email Server - OpenAPI-compatible HTTP server
Features:
- IMAP (login) + SMTP (login)
- List/search all folders (including shared/public)
- New-message tracking and flagging
- Triage guidance (business description + personnel/chain-of-command)
- Extended tools: reply, forward, drafts, move, copy, labels, attachments, schedule_send, export_conversation
- Conflict check search
- CalDAV / CardDAV integration (e.g., Nextcloud) for calendar and contacts
- Exposes OpenAPI 3.1 spec for integration with Open WebUI (OpenAPI tool server)
Run:
- uvicorn server:app --host 0.0.0.0 --port 8000
"""
import json
import os
import time
import base64
import email
import email.policy
import imaplib
import smtplib
import logging
import threading
import uuid
import re
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger = logging.getLogger("mcp-email-server")
# -------------------- CONFIG --------------------
@dataclass
class EmailConfig:
# IMAP
IMAP_HOST: str = os.getenv("IMAP_HOST", "imap.example.com")
IMAP_PORT: int = int(os.getenv("IMAP_PORT", "993"))
IMAP_USE_SSL: bool = os.getenv("IMAP_USE_SSL", "true").lower() in ("true", "1", "yes")
IMAP_USERNAME: str = os.getenv("IMAP_USERNAME", "")
IMAP_PASSWORD: str = os.getenv("IMAP_PASSWORD", "")
# SMTP
SMTP_HOST: str = os.getenv("SMTP_HOST", "smtp.example.com")
SMTP_PORT: int = int(os.getenv("SMTP_PORT", "587"))
SMTP_USE_TLS: bool = os.getenv("SMTP_USE_TLS", "true").lower() in ("true", "1", "yes")
SMTP_USE_SSL: bool = os.getenv("SMTP_USE_SSL", "false").lower() in ("true", "1", "yes")
SMTP_USERNAME: str = os.getenv("SMTP_USERNAME", "")
SMTP_PASSWORD: str = os.getenv("SMTP_PASSWORD", "")
SMTP_FROM: str = os.getenv("SMTP_FROM", "")
# Optional cc/bcc defaults
DEFAULT_CC: str = os.getenv("DEFAULT_CC", "")
DEFAULT_BCC: str = os.getenv("DEFAULT_BCC", "")
# Business/person description
BUSINESS_DESCRIPTION: str = os.getenv("BUSINESS_DESCRIPTION", "")
# Personnel / chain-of-command (JSON)
PERSONNEL_JSON: str = os.getenv("PERSONNEL_JSON", "[]")
# Scheduling
SCHEDULED_SEND_INTERVAL: int = int(os.getenv("SCHEDULED_SEND_INTERVAL", "10"))
# CardDAV / CalDAV (e.g., Nextcloud)
DAV_BASE_URL: str = os.getenv("DAV_BASE_URL", "") # e.g. https://cloud.example.com/remote.php/dav
DAV_USERNAME: str = os.getenv("DAV_USERNAME", "")
DAV_PASSWORD: str = os.getenv("DAV_PASSWORD", "")
DAV_VERIFY_TLS: bool = os.getenv("DAV_VERIFY_TLS", "true").lower() in ("true", "1", "yes")
@property
def personnel(self) -> List[Dict[str, Any]]:
try:
return json.loads(self.PERSONNEL_JSON)
except Exception:
return []
@property
def default_cc_list(self) -> List[str]:
return [e.strip() for e in self.DEFAULT_CC.split(",") if e.strip()]
@property
def default_bcc_list(self) -> List[str]:
return [e.strip() for e in self.DEFAULT_BCC.split(",") if e.strip()]
config = EmailConfig()
# -------------------- IMAP HELPERS (UID-based) --------------------
def create_imap():
if config.IMAP_USE_SSL:
return imaplib.IMAP4_SSL(config.IMAP_HOST, config.IMAP_PORT)
else:
m = imaplib.IMAP4(config.IMAP_HOST, config.IMAP_PORT)
m.starttls()
return m
def login_imap(imap):
imap.login(config.IMAP_USERNAME, config.IMAP_PASSWORD)
def list_all_folders(imap):
_, data = imap.list()
folders = []
for line in data or []:
if isinstance(line, bytes):
line = line.decode("utf-8", errors="replace")
parts = line.split('"')
if len(parts) >= 4:
name = parts[3]
folders.append({"path": name})
return folders
def ensure_selected(imap, folder: str):
imap.select(folder, readonly=False)
def parse_message(imap, uid: int):
status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
raise RuntimeError(f"Failed to fetch message UID {uid}")
msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default)
subject = msg.get("Subject", "") or ""
from_header = msg.get("From", "") or ""
to_header = msg.get("To", "") or ""
cc_header = msg.get("Cc", "") or ""
date_header = msg.get("Date", "") or ""
message_id = msg.get("Message-ID", "") or ""
in_reply_to = msg.get("In-Reply-To", "") or ""
references = msg.get("References", "") or ""
body_plain = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
ptype = part.get_content_type()
pdisp = (part.get("Content-Disposition") or "").lower()
if "attachment" in pdisp:
continue
if ptype == "text/plain":
body_plain += part.get_payload(decode=True).decode(
part.get_content_charset() or "utf-8", errors="replace"
)
elif ptype == "text/html":
body_html += part.get_payload(decode=True).decode(
part.get_content_charset() or "utf-8", errors="replace"
)
else:
ptype = msg.get_content_type()
if ptype in ("text/plain", "text/html"):
payload = msg.get_payload(decode=True).decode(
msg.get_content_charset() or "utf-8", errors="replace"
)
if ptype == "text/plain":
body_plain = payload
else:
body_html = payload
_, flags_data = imap.uid("FETCH", str(uid), "(FLAGS)")
flags = []
if flags_data:
line = flags_data[0].decode("utf-8", errors="replace")
start = line.find("(")
end = line.find(")")
if start != -1 and end != -1:
flags = [f.strip() for f in line[start+1:end].split() if f.strip()]
return {
"uid": uid,
"subject": subject,
"from": from_header,
"to": to_header,
"cc": cc_header,
"date": date_header,
"body_plain": body_plain,
"body_html": body_html,
"flags": flags,
"message_id": message_id,
"in_reply_to": in_reply_to,
"references": references,
}
def search_messages(imap, folder: str, query: str, since: Optional[str], max_results: int):
ensure_selected(imap, folder)
criteria = []
if query:
criteria.append(f'SUBJECT "{query}"')
else:
criteria.append("ALL")
if since:
criteria.append(f"SINCE {since}")
search_str = " ".join(criteria)
status, data = imap.uid("SEARCH", None, search_str)
if status != "OK" or not data or not data[0]:
return []
ids = (data[0].split() if data[0] else [])
ids = ids[-max_results:]
results = []
for uid_bytes in ids:
uid = int(uid_bytes)
try:
m = parse_message(imap, uid)
results.append(m)
except Exception as e:
logger.error(f"Error parsing message UID {uid}: {e}")
return results
def mark_as_read(imap, folder: str, uid: int):
ensure_selected(imap, folder)
imap.uid("STORE", str(uid), "+FLAGS", "\\Seen")
def flag_message(imap, folder: str, uid: int):
ensure_selected(imap, folder)
imap.uid("STORE", str(uid), "+FLAGS", "\\Flagged")
def move_message(imap, folder: str, uid: int, dest_folder: str):
ensure_selected(imap, folder)
imap.uid("COPY", str(uid), dest_folder)
imap.uid("STORE", str(uid), "+FLAGS", "\\Deleted")
imap.expunge()
def copy_message(imap, folder: str, uid: int, dest_folder: str):
ensure_selected(imap, folder)
imap.uid("COPY", str(uid), dest_folder)
def add_flags(imap, folder: str, uid: int, flags: List[str]):
ensure_selected(imap, folder)
imap.uid("STORE", str(uid), "+FLAGS", tuple(flags))
def remove_flags(imap, folder: str, uid: int, flags: List[str]):
ensure_selected(imap, folder)
imap.uid("STORE", str(uid), "-FLAGS", tuple(flags))
def get_labels(imap, folder: str, uid: int):
_, flags_data = imap.uid("FETCH", str(uid), "(FLAGS)")
if not flags_data:
return []
line = flags_data[0].decode("utf-8", errors="replace")
start = line.find("(")
end = line.find(")")
if start == -1 or end == -1:
return []
flags = [f.strip() for f in line[start+1:end].split() if f.strip()]
# Consider custom labels as flags starting with "$Label_"
return [f for f in flags if f.startswith("$Label_")]
def list_available_labels(imap):
# Discover used labels by scanning a few folders (heuristic).
labels = set()
try:
folders = list_all_folders(imap)
for f in folders[:20]: # limit scan
path = f["path"]
try:
ensure_selected(imap, path)
status, data = imap.uid("SEARCH", None, "ALL")
if status != "OK" or not data or not data[0]:
continue
ids = data[0].split()
# sample first 20 UIDs
for uid_bytes in ids[:20]:
uid = int(uid_bytes)
_, flags_data = imap.uid("FETCH", str(uid), "(FLAGS)")
if not flags_data:
continue
line = flags_data[0].decode("utf-8", errors="replace")
start = line.find("(")
end = line.find(")")
if start == -1 or end == -1:
continue
flags = [f.strip() for f in line[start+1:end].split() if f.strip()]
for fl in flags:
if fl.startswith("$Label_"):
labels.add(fl.replace("$Label_", "", 1))
except Exception:
continue
except Exception:
pass
return sorted(labels)
# -------------------- SMTP HELPERS --------------------
def send_email(
to: List[str],
subject: str,
body: str,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
html: bool = False,
in_reply_to: Optional[str] = None,
references: Optional[str] = None,
):
if not config.SMTP_FROM:
raise ValueError("SMTP_FROM not configured")
if not cc:
cc = []
if not bcc:
bcc = []
cc = list(set(cc + config.default_cc_list))
bcc = list(set(bcc + config.default_bcc_list))
all_recipients = to + cc + bcc
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
if html:
m = MIMEMultipart("alternative")
m.attach(MIMEText(body, "html"))
else:
m = MIMEText(body, "plain")
m["From"] = config.SMTP_FROM
m["To"] = ", ".join(to)
if cc:
m["Cc"] = ", ".join(cc)
m["Subject"] = subject
m["Message-ID"] = f"<{uuid.uuid4().hex}@{config.SMTP_FROM.split('@')[1]}>"
if in_reply_to:
m["In-Reply-To"] = in_reply_to
if references:
m["References"] = references
if config.SMTP_USE_SSL:
s = smtplib.SMTP_SSL(config.SMTP_HOST, config.SMTP_PORT)
else:
s = smtplib.SMTP(config.SMTP_HOST, config.SMTP_PORT)
if config.SMTP_USE_TLS:
s.starttls()
s.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
s.sendmail(config.SMTP_FROM, all_recipients, m.as_string())
s.quit()
# -------------------- NEW-MESSAGE TRACKING --------------------
last_seen_uids: Dict[str, set] = {}
def get_new_messages(imap, folder: str):
ensure_selected(imap, folder)
status, data = imap.uid("SEARCH", None, "ALL")
if status != "OK" or not data or not data[0]:
return []
ids = data[0].split()
uids = {int(u) for u in ids}
prev = last_seen_uids.setdefault(folder, set())
new_uids = sorted(uids - prev)
last_seen_uids[folder] = uids
results = []
for uid in new_uids:
m = parse_message(imap, uid)
results.append(m)
return results
def sync_seen(imap, folder: str):
ensure_selected(imap, folder)
status, data = imap.uid("SEARCH", None, "ALL")
if status != "OK" or not data or not data[0]:
last_seen_uids[folder] = set()
return
ids = data[0].split()
last_seen_uids[folder] = {int(u) for u in ids}
# -------------------- TRIAGE HELPERS --------------------
def build_triage_hint(msg: Dict[str, Any]) -> str:
if not config.BUSINESS_DESCRIPTION and not config.personnel:
return ""
hint_parts = []
if config.BUSINESS_DESCRIPTION:
hint_parts.append(f"Context: {config.BUSINESS_DESCRIPTION}")
from email.utils import parseaddr
sender_email = parseaddr(msg.get("from", ""))[1].lower()
personnel_by_email = {
p["email"].lower(): p for p in config.personnel if p.get("email")
}
sender_info = personnel_by_email.get(sender_email)
if sender_info:
hint_parts.append(
f"Sender is internal: {sender_info['name']} ({sender_info['role']})."
)
if sender_info.get("escalates_to"):
hint_parts.append(
f"If escalation needed, escalate to: {sender_info['escalates_to']}."
)
subject = (msg.get("subject") or "").lower()
if any(w in subject for w in ["urgent", "asap", "escalat", "priority"]):
hint_parts.append("Message appears urgent; consider escalation.")
return " ".join(hint_parts)
# -------------------- CONFLICT CHECK SEARCH --------------------
def conflict_check_search(imap, terms: List[str], folders: Optional[List[str]] = None, max_results: int = 100):
"""
Search emails across specified folders (or INBOX if not provided)
for messages mentioning any of the given terms (names, firms, counsel).
Useful for conflict checks.
"""
if not folders:
folders = ["INBOX"]
all_results = []
for folder in folders:
try:
ensure_selected(imap, folder)
except Exception:
continue
for term in terms:
try:
status, data = imap.uid("SEARCH", None, f'SUBJECT "{term}"')
if status != "OK" or not data or not data[0]:
continue
ids = data[0].split()
for uid_bytes in ids[-max_results:]:
uid = int(uid_bytes)
msg = parse_message(imap, uid)
all_results.append({
"folder": folder,
"uid": uid,
"subject": msg["subject"],
"from": msg["from"],
"to": msg["to"],
"cc": msg["cc"],
"date": msg["date"],
"matched_term": term,
"snippet": (msg["body_plain"] or "")[:400],
})
except Exception as e:
logger.error(f"Conflict search error for term '{term}' in {folder}: {e}")
# Deduplicate by (folder, uid)
seen = set()
unique = []
for r in all_results:
key = (r["folder"], r["uid"])
if key not in seen:
seen.add(key)
unique.append(r)
return unique
# -------------------- ATTACHMENTS --------------------
def list_attachments(imap, folder: str, uid: int) -> List[Dict[str, Any]]:
ensure_selected(imap, folder)
status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return []
msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default)
attachments = []
for part in msg.walk():
pdisp = (part.get("Content-Disposition") or "").lower()
if "attachment" not in pdisp:
continue
filename = part.get_filename() or "attachment"
size = len(part.get_payload(decode=True) or b"")
attachments.append({
"filename": filename,
"content_type": part.get_content_type(),
"size": size,
})
return attachments
def download_attachment(imap, folder: str, uid: int, filename: str) -> Dict[str, Any]:
ensure_selected(imap, folder)
status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
raise ValueError("Failed to fetch message for attachment")
msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default)
for part in msg.walk():
pdisp = (part.get("Content-Disposition") or "").lower()
if "attachment" not in pdisp:
continue
part_filename = part.get_filename() or "attachment"
if part_filename != filename:
continue
payload = part.get_payload(decode=True) or b""
b64 = base64.b64encode(payload).decode("ascii")
return {
"filename": part_filename,
"content_type": part.get_content_type(),
"size": len(payload),
"content_base64": b64,
}
raise ValueError(f"Attachment not found: {filename}")
def search_attachments(imap, folder: str, file_pattern: str, max_results: int):
# Search messages with attachments by filename pattern (case-insensitive).
ensure_selected(imap, folder)
status, data = imap.uid("SEARCH", None, "ALL")
if status != "OK" or not data or not data[0]:
return []
ids = data[0].split()
ids = ids[-max_results * 5:] # sample more to find matches
pattern = file_pattern.lower()
results = []
for uid_bytes in ids:
if len(results) >= max_results:
break
uid = int(uid_bytes)
try:
status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default)
for part in msg.walk():
pdisp = (part.get("Content-Disposition") or "").lower()
if "attachment" not in pdisp:
continue
filename = (part.get_filename() or "").lower()
if pattern and pattern not in filename:
continue
results.append({
"uid": uid,
"subject": msg.get("Subject", ""),
"from": msg.get("From", ""),
"date": msg.get("Date", ""),
"filename": part.get_filename() or "attachment",
})
break # one match per message is enough
except Exception as e:
logger.error(f"Error scanning message UID {uid}: {e}")
return results
# -------------------- DRAFTS (IMAP Drafts folder) --------------------
def save_draft_to_imap(imap, folder: str, to: List[str], subject: str, body: str, html: bool = False):
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
m = MIMEMultipart()
m["From"] = config.SMTP_FROM
m["To"] = ", ".join(to)
m["Subject"] = subject
m["Message-ID"] = f"<{uuid.uuid4().hex}@{config.SMTP_FROM.split('@')[1]}>"
m.attach(MIMEText(body, "html" if html else "plain"))
ensure_selected(imap, folder)
imap.append(folder, None, None, m.as_string().encode("utf-8"))
return {"status": "draft_saved", "folder": folder}
# -------------------- SCHEDULED SEND (in-memory) --------------------
scheduled_sends: Dict[str, Dict[str, Any]] = {}
schedule_lock = threading.Lock()
def add_scheduled_send(to, subject, body, cc, bcc, html, send_at, in_reply_to, references):
task_id = uuid.uuid4().hex
with schedule_lock:
scheduled_sends[task_id] = {
"to": to,
"subject": subject,
"body": body,
"cc": cc,
"bcc": bcc,
"html": html,
"send_at": float(send_at),
"in_reply_to": in_reply_to,
"references": references,
}
return task_id
def scheduled_send_loop():
while True:
now = time.time()
with schedule_lock:
ready = [tid for tid, t in scheduled_sends.items() if t["send_at"] <= now]
for tid in ready:
with schedule_lock:
t = scheduled_sends.pop(tid, None)
if not t:
continue
try:
send_email(
to=t["to"],
subject=t["subject"],
body=t["body"],
cc=t["cc"],
bcc=t["bcc"],
html=t["html"],
in_reply_to=t.get("in_reply_to"),
references=t.get("references"),
)
except Exception as e:
logger.error(f"Scheduled send error for {tid}: {e}")
time.sleep(config.SCHEDULED_SEND_INTERVAL)
scheduler_thread = threading.Thread(target=scheduled_send_loop, daemon=True)
scheduler_thread.start()
# -------------------- EXPORT CONVERSATION --------------------
def export_conversation(imap, folder: str, message_uid: int, max_messages: int):
# Basic: fetch messages with same subject and thread-like headers in same folder.
ensure_selected(imap, folder)
status, msg_data = imap.uid("FETCH", str(message_uid), "(RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return "Failed to fetch root message."
root = email.message_from_bytes(msg_data[0][1], policy=email.policy.default)
subject = (root.get("Subject", "") or "").strip()
msg_id = (root.get("Message-ID", "") or "").strip()
refs = set((root.get("References", "") or "").split())
in_reply = (root.get("In-Reply-To", "") or "").strip()
if in_reply:
refs.add(in_reply)
status, data = imap.uid("SEARCH", None, f'SUBJECT "{subject}"')
if status != "OK" or not data or not data[0]:
ids = []
else:
ids = data[0].split()
thread_uids = []
for uid_bytes in ids:
uid = int(uid_bytes)
if len(thread_uids) >= max_messages:
break
status, mdata = imap.uid("FETCH", str(uid), "(RFC822)")
if status != "OK" or not mdata or mdata[0] is None:
continue
m = email.message_from_bytes(mdata[0][1], policy=email.policy.default)
mid = (m.get("Message-ID", "") or "").strip()
mrefs = set((m.get("References", "") or "").split())
mirt = (m.get("In-Reply-To", "") or "").strip()
if uid == message_uid or mid == msg_id:
thread_uids.append(uid)
elif refs and mrefs & refs:
thread_uids.append(uid)
elif mirt and (mirt == msg_id or mirt in refs):
thread_uids.append(uid)
elif mid and (mid in refs or mid == in_reply):
thread_uids.append(uid)
lines = []
for uid in thread_uids:
try:
msg = parse_message(imap, uid)
except Exception:
continue
lines.append(f"From: {msg['from']}")
lines.append(f"To: {msg['to']}")
lines.append(f"Date: {msg['date']}")
lines.append(f"Subject: {msg['subject']}")
lines.append("")
lines.append(msg["body_plain"].strip())
lines.append("\n---\n")
return "\n".join(lines)
# -------------------- CARDdav / CALdav HELPERS --------------------
DAV_NS = {"D": "DAV:"}
def dav_session():
if not config.DAV_BASE_URL or not config.DAV_USERNAME or not config.DAV_PASSWORD:
raise ValueError("DAV_BASE_URL, DAV_USERNAME, or DAV_PASSWORD not configured")
s = requests.Session()
s.auth = (config.DAV_USERNAME, config.DAV_PASSWORD)
s.verify = config.DAV_VERIFY_TLS
return s
def dav_request(s, method, url, data=None, headers=None, timeout=20):
try:
r = s.request(method, url, data=data, headers=headers or {}, timeout=timeout)
return r
except Exception as e:
logger.error(f"DAV request error: {method} {url}: {e}")
raise
def parse_dav_propfind(xml_text):
"""
Parse a PROPFIND response and return list of {href, props}.
props is a dict of {local-name: text}.
"""
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return []
results = []
for response in root.findall(".//D:response", DAV_NS):
href_el = response.find("D:href", DAV_NS)
href = (href_el.text or "").strip() if href_el is not None else ""
props = {}
propfind = response.find("D:propstat/D:prop", DAV_NS)
if propfind is not None:
for child in propfind:
tag = re.sub(r".*\{.*\}", "", child.tag)
text = (child.text or "").strip()
props[tag] = text
if href:
results.append({"href": href, "props": props})
return results
# CardDAV helpers
def carddav_base_url():
return config.DAV_BASE_URL.rstrip("/") + "/cards/"
def list_carddav_addressbooks():
s = dav_session()
r = dav_request(s, "PROPFIND", carddav_base_url(),
headers={"Depth": "1",
"Content-Type": "application/xml"},
data="""<?xml version="1.0"?>
<D:propfind xmlns:D="DAV:">
<D:prop>
<D:resourcetype/>
<D:displayname/>
</D:prop>
</D:propfind>""")
r.raise_for_status()
entries = parse_dav_propfind(r.text)
addressbooks = []
for e in entries:
href = e["href"]
props = e.get("props", {})
# Check resource type for addressbook
rt = props.get("resourcetype", "")
if "/cards/" in href and href.endswith("/") and ("addressbook" in rt.lower() or "addressbook" in href.lower()):
addressbooks.append({
"href": href,
"name": props.get("displayname", href),
})
return addressbooks
def _xml_escape(s: str) -> str:
s = s.replace("&", "&amp;")
s = s.replace("<", "&lt;")
s = s.replace(">", "&gt;")
s = s.replace('"', "&quot;")
s = s.replace("'", "&apos;")
return s
def search_carddav_contacts(addressbook_href: str, query: str):
s = dav_session()
safe_query = _xml_escape(query)
req_body = f"""<?xml version="1.0"?>
<v:carddav xmlns:v="urn:ietf:params:xml:ns:carddav"
xmlns:D="DAV:">
<v:addressbook-query>
<v:filter>
<v:prop-filter name="FN">
<v:text-match>{safe_query}</v:text-match>
</v:prop-filter>
</v:filter>
</v:addressbook-query>
</v:carddav>"""
r = dav_request(s, "REPORT", addressbook_href,
headers={"Content-Type": "application/xml"},
data=req_body)
r.raise_for_status()
hrefs = re.findall(r'<D:href>(.*?)</D:href>', r.text, re.DOTALL)
contacts = []
for href in hrefs:
href = href.strip()
if not href:
continue
try:
vr = dav_request(s, "GET", href)
vr.raise_for_status()
contacts.append({
"href": href,
"vcard": vr.text,
})
except Exception as e:
logger.error(f"Error fetching vCard {href}: {e}")
return contacts
def get_carddav_contact(href: str):
s = dav_session()
r = dav_request(s, "GET", href)
r.raise_for_status()
return {"href": href, "vcard": r.text}
def create_carddav_contact(addressbook_href: str, vcard: str):
s = dav_session()
filename = f"{uuid.uuid4().hex}.vcf"
url = addressbook_href.rstrip("/") + "/" + filename
r = dav_request(s, "PUT", url, data=vcard.encode("utf-8"),
headers={"Content-Type": "text/vcard"})
r.raise_for_status()
return {"href": url}
def update_carddav_contact(href: str, vcard: str):
s = dav_session()
r = dav_request(s, "PUT", href, data=vcard.encode("utf-8"),
headers={"Content-Type": "text/vcard"})
r.raise_for_status()
return {"href": href}
def delete_carddav_contact(href: str):
s = dav_session()
r = dav_request(s, "DELETE", href)
r.raise_for_status()
return {"href": href}
# CalDAV helpers
def caldav_base_url():
return config.DAV_BASE_URL.rstrip("/") + "/calendars/"
def list_caldav_calendars():
s = dav_session()
r = dav_request(s, "PROPFIND", caldav_base_url(),
headers={"Depth": "1",
"Content-Type": "application/xml"},
data="""<?xml version="1.0"?>
<D:propfind xmlns:D="DAV:">
<D:prop>
<D:resourcetype/>
<D:displayname/>
</D:prop>
</D:propfind>""")
r.raise_for_status()
entries = parse_dav_propfind(r.text)
calendars = []
for e in entries:
href = e["href"]
props = e.get("props", {})
rt = props.get("resourcetype", "")
if "/calendars/" in href and href.endswith("/") and ("calendar" in rt.lower() or "calendar" in href.lower()):
calendars.append({
"href": href,
"name": props.get("displayname", href.split("/")[-2]),
})
return calendars
def search_caldav_events(calendar_href: str, start: str, end: str):
# start/end as ISO8601, e.g. "20251201T000000Z"
s = dav_session()
req_body = f"""<?xml version="1.0"?>
<D:calendar-query xmlns:D="DAV:"
xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:prop>
<D:getetag/>
</D:prop>
<C:filter>
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:time-range start="{start}" end="{end}"/>
</C:comp-filter>
</C:filter>
</C:filter>
</D:calendar-query>"""
r = dav_request(s, "REPORT", calendar_href,
headers={"Content-Type": "application/xml"},
data=req_body)
r.raise_for_status()
hrefs = re.findall(r'<D:href>(.*?)</D:href>', r.text, re.DOTALL)
events = []
for href in hrefs:
href = href.strip()
if not href:
continue
try:
vr = dav_request(s, "GET", href)
vr.raise_for_status()
events.append({
"href": href,
"ical": vr.text,
})
except Exception as e:
logger.error(f"Error fetching event {href}: {e}")
return events
def create_caldav_event(calendar_href: str, ical: str):
s = dav_session()
filename = f"{uuid.uuid4().hex}.ics"
url = calendar_href.rstrip("/") + "/" + filename
r = dav_request(s, "PUT", url, data=ical.encode("utf-8"),
headers={"Content-Type": "text/calendar"})
r.raise_for_status()
return {"href": url}
def update_caldav_event(href: str, ical: str):
s = dav_session()
r = dav_request(s, "PUT", href, data=ical.encode("utf-8"),
headers={"Content-Type": "text/calendar"})
r.raise_for_status()
return {"href": href}
def delete_caldav_event(href: str):
s = dav_session()
r = dav_request(s, "DELETE", href)
r.raise_for_status()
return {"href": href}
# -------------------- FASTAPI / OPENAPI ENDPOINTS --------------------
app = FastAPI(
title="MCP Email Server",
description="Email assistant MCP server exposing IMAP/SMTP, CalDAV/CardDAV, and conflict-check operations via OpenAPI.",
version="1.0.0",
)
# Request models
class ListFoldersRequest(BaseModel):
pass
class SearchMessagesRequest(BaseModel):
folder: str = Field(default="INBOX", description="IMAP folder to search")
query: str = Field(default="", description="Search query (subject)")
since: Optional[str] = Field(
default=None,
description="Optional IMAP SINCE date, e.g. '01-Jan-2024'",
)
max_results: int = Field(default=50, description="Max results to return")
class GetNewMessagesRequest(BaseModel):
folder: str = Field(default="INBOX", description="IMAP folder to check")
class SyncSeenRequest(BaseModel):
folder: str = Field(default="INBOX", description="IMAP folder")
class MarkAsReadRequest(BaseModel):
folder: str
uid: int
class FlagMessageRequest(BaseModel):
folder: str
uid: int
class MoveMessageRequest(BaseModel):
folder: str
uid: int
dest_folder: str
class CopyMessageRequest(BaseModel):
folder: str
uid: int
dest_folder: str
class ManageLabelRequest(BaseModel):
folder: str
uid: int
label: str # without "$Label_" prefix; we add it internally
class ListLabelsRequest(BaseModel):
folder: str = Field(default="INBOX", description="IMAP folder")
uid: Optional[int] = Field(default=None, description="If set, list labels for that message; else list used labels in folder.")
class SendEmailRequest(BaseModel):
to: List[str]
subject: str
body: str
cc: Optional[List[str]] = Field(default=None)
bcc: Optional[List[str]] = Field(default=None)
html: bool = Field(default=False, description="Treat body as HTML if true")
in_reply_to: Optional[str] = Field(default=None, description="In-Reply-To header for replies")
references: Optional[str] = Field(default=None, description="References header for thread continuity")
class ReplyToMessageRequest(BaseModel):
folder: str
uid: int
body: str
html: bool = Field(default=False)
to: Optional[List[str]] = Field(default=None, description="Override reply recipients")
cc: Optional[List[str]] = Field(default=None)
bcc: Optional[List[str]] = Field(default=None)
class ForwardMessageRequest(BaseModel):
folder: str
uid: int
to: List[str]
cc: Optional[List[str]] = Field(default=None)
bcc: Optional[List[str]] = Field(default=None)
note: str = Field(default="", description="Optional note to prepend")
html: bool = Field(default=False)
class SaveDraftRequest(BaseModel):
to: List[str]
subject: str
body: str
html: bool = Field(default=False)
folder: str = Field(default="Drafts", description="IMAP folder to save draft")
class GetUnreadSummaryRequest(BaseModel):
folder: str = Field(default="INBOX")
class ListAttachmentsRequest(BaseModel):
folder: str
uid: int
class DownloadAttachmentRequest(BaseModel):
folder: str
uid: int
filename: str
class SearchAttachmentsRequest(BaseModel):
folder: str = Field(default="INBOX")
file_pattern: str = Field(default="", description="Filename substring to match (case-insensitive)")
max_results: int = Field(default=50)
class ScheduleSendRequest(BaseModel):
to: List[str]
subject: str
body: str
cc: Optional[List[str]] = Field(default=None)
bcc: Optional[List[str]] = Field(default=None)
html: bool = Field(default=False)
send_at: float = Field(description="Unix timestamp when to send")
in_reply_to: Optional[str] = Field(default=None)
references: Optional[str] = Field(default=None)
class ExportConversationRequest(BaseModel):
folder: str
message_uid: int
max_messages: int = Field(default=50)
class ConflictCheckSearchRequest(BaseModel):
terms: List[str] = Field(description="Terms to search (names, firms, counsel).")
folders: Optional[List[str]] = Field(default=None, description="Folders to search; defaults to INBOX.")
max_results: int = Field(default=100)
class GetTriageConfigRequest(BaseModel):
pass
class SearchCardDAVContactsRequest(BaseModel):
addressbook_href: str
query: str
class GetCardDAVContactRequest(BaseModel):
href: str
class CreateCardDAVContactRequest(BaseModel):
addressbook_href: str
vcard: str
class UpdateCardDAVContactRequest(BaseModel):
href: str
vcard: str
class DeleteCardDAVContactRequest(BaseModel):
href: str
class SearchCalDAVEventsRequest(BaseModel):
calendar_href: str
start: str = Field(description="ISO8601 start, e.g. 20251201T000000Z")
end: str = Field(description="ISO8601 end, e.g. 20251231T235959Z")
class CreateCalDAVEventRequest(BaseModel):
calendar_href: str
ical: str
class UpdateCalDAVEventRequest(BaseModel):
href: str
ical: str
class DeleteCalDAVEventRequest(BaseModel):
href: str
# Helper to run IMAP operations in a thread-safe way (FastAPI is async)
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
def run_imap_op(fn):
def wrapper(*args, **kwargs):
return executor.submit(fn, *args, **kwargs).result()
return wrapper
@app.post("/list_folders")
@run_imap_op
def list_folders(_req: ListFoldersRequest):
imap = create_imap()
try:
login_imap(imap)
folders = list_all_folders(imap)
return {"folders": folders}
finally:
imap.logout()
@app.post("/search_messages")
@run_imap_op
def search_messages_endpoint(req: SearchMessagesRequest):
imap = create_imap()
try:
login_imap(imap)
results = search_messages(
imap,
folder=req.folder,
query=req.query,
since=req.since,
max_results=req.max_results,
)
return {"messages": results}
finally:
imap.logout()
@app.post("/get_new_messages")
@run_imap_op
def get_new_messages_endpoint(req: GetNewMessagesRequest):
imap = create_imap()
try:
login_imap(imap)
new = get_new_messages(imap, req.folder)
out = []
for m in new:
m["triage_hint"] = build_triage_hint(m)
out.append(m)
return {"messages": out}
finally:
imap.logout()
@app.post("/sync_seen")
@run_imap_op
def sync_seen_endpoint(req: SyncSeenRequest):
imap = create_imap()
try:
login_imap(imap)
sync_seen(imap, req.folder)
return {"status": "ok", "folder": req.folder}
finally:
imap.logout()
@app.post("/mark_as_read")
@run_imap_op
def mark_as_read_endpoint(req: MarkAsReadRequest):
imap = create_imap()
try:
login_imap(imap)
mark_as_read(imap, req.folder, req.uid)
return {"status": "ok"}
finally:
imap.logout()
@app.post("/flag_message")
@run_imap_op
def flag_message_endpoint(req: FlagMessageRequest):
imap = create_imap()
try:
login_imap(imap)
flag_message(imap, req.folder, req.uid)
return {"status": "ok"}
finally:
imap.logout()
@app.post("/move_message")
@run_imap_op
def move_message_endpoint(req: MoveMessageRequest):
imap = create_imap()
try:
login_imap(imap)
move_message(imap, req.folder, req.uid, req.dest_folder)
return {"status": "moved", "dest_folder": req.dest_folder}
finally:
imap.logout()
@app.post("/copy_message")
@run_imap_op
def copy_message_endpoint(req: CopyMessageRequest):
imap = create_imap()
try:
login_imap(imap)
copy_message(imap, req.folder, req.uid, req.dest_folder)
return {"status": "copied", "dest_folder": req.dest_folder}
finally:
imap.logout()
@app.post("/apply_label")
@run_imap_op
def apply_label_endpoint(req: ManageLabelRequest):
imap = create_imap()
try:
login_imap(imap)
add_flags(imap, req.folder, req.uid, [f"$Label_{req.label}"])
return {"status": "applied", "label": req.label}
finally:
imap.logout()
@app.post("/remove_label")
@run_imap_op
def remove_label_endpoint(req: ManageLabelRequest):
imap = create_imap()
try:
login_imap(imap)
remove_flags(imap, req.folder, req.uid, [f"$Label_{req.label}"])
return {"status": "removed", "label": req.label}
finally:
imap.logout()
@app.post("/list_labels")
@run_imap_op
def list_labels_endpoint(req: ListLabelsRequest):
imap = create_imap()
try:
login_imap(imap)
if req.uid:
labels = get_labels(imap, req.folder, req.uid)
return {"labels": [l.replace("$Label_", "", 1) for l in labels]}
else:
labels = list_available_labels(imap)
return {"labels": labels}
finally:
imap.logout()
@app.post("/get_unread_summary")
@run_imap_op
def get_unread_summary_endpoint(req: GetUnreadSummaryRequest):
imap = create_imap()
try:
login_imap(imap)
ensure_selected(imap, req.folder)
status, data = imap.uid("SEARCH", None, "UNSEEN")
if status != "OK" or not data or not data[0]:
ids = []
else:
ids = data[0].split()
uids = [int(u) for u in ids]
count = len(uids)
# Provide short previews for up to 20
previews = []
for uid in uids[:20]:
msg = parse_message(imap, uid)
previews.append({
"uid": uid,
"subject": msg["subject"],
"from": msg["from"],
"date": msg["date"],
"snippet": (msg["body_plain"] or "")[:300],
})
return {
"folder": req.folder,
"unread_count": count,
"previews": previews,
}
finally:
imap.logout()
@app.post("/send_email")
def send_email_endpoint(req: SendEmailRequest):
try:
send_email(
to=req.to,
subject=req.subject,
body=req.body,
cc=req.cc,
bcc=req.bcc,
html=req.html,
in_reply_to=req.in_reply_to,
references=req.references,
)
return {"status": "sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/reply_to_message")
@run_imap_op
def reply_to_message_endpoint(req: ReplyToMessageRequest):
imap = create_imap()
try:
login_imap(imap)
msg = parse_message(imap, req.folder, req.uid)
# Determine reply recipients
from email.utils import getaddresses
if req.to:
reply_to_list = req.to
else:
# default: reply to "To" and "From"
addrs = []
for _, addr in getaddresses([msg["to"], msg["from"]]):
if addr:
addrs.append(addr)
reply_to_list = addrs
subject = msg["subject"]
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
# Thread headers
in_reply_to = msg.get("message_id") or None
refs = (msg.get("references") or "").strip()
if in_reply_to and refs:
refs = f"{refs} {in_reply_to}"
elif in_reply_to:
refs = in_reply_to
send_email(
to=reply_to_list,
subject=subject,
body=req.body,
cc=req.cc,
bcc=req.bcc,
html=req.html,
in_reply_to=in_reply_to,
references=refs,
)
return {"status": "replied"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/forward_message")
@run_imap_op
def forward_message_endpoint(req: ForwardMessageRequest):
imap = create_imap()
try:
login_imap(imap)
msg = parse_message(imap, req.folder, req.uid)
subject = msg["subject"]
if not subject.lower().startswith("fwd:"):
subject = f"Fwd: {subject}"
body = req.body or ""
if req.note:
body = f"{req.note}\n\n" + body
# Include original message as quoted text
original = msg["body_plain"] or (msg["body_html"] or "")
if original:
quoted = "\n".join("> " + line for line in original.splitlines())
body = (body + "\n\n" + quoted).strip()
send_email(
to=req.to,
subject=subject,
body=body,
cc=req.cc,
bcc=req.bcc,
html=req.html,
)
return {"status": "forwarded"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/save_draft")
@run_imap_op
def save_draft_endpoint(req: SaveDraftRequest):
imap = create_imap()
try:
login_imap(imap)
return save_draft_to_imap(imap, req.folder, req.to, req.subject, req.body, req.html)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/list_attachments")
@run_imap_op
def list_attachments_endpoint(req: ListAttachmentsRequest):
imap = create_imap()
try:
login_imap(imap)
atts = list_attachments(imap, req.folder, req.uid)
return {"attachments": atts}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/download_attachment")
@run_imap_op
def download_attachment_endpoint(req: DownloadAttachmentRequest):
imap = create_imap()
try:
login_imap(imap)
info = download_attachment(imap, req.folder, req.uid, req.filename)
return info
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/search_attachments")
@run_imap_op
def search_attachments_endpoint(req: SearchAttachmentsRequest):
imap = create_imap()
try:
login_imap(imap)
results = search_attachments(imap, req.folder, req.file_pattern, req.max_results)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/schedule_send")
def schedule_send_endpoint(req: ScheduleSendRequest):
task_id = add_scheduled_send(
to=req.to,
subject=req.subject,
body=req.body,
cc=req.cc,
bcc=req.bcc,
html=req.html,
send_at=req.send_at,
in_reply_to=req.in_reply_to,
references=req.references,
)
return {"status": "scheduled", "task_id": task_id}
@app.post("/export_conversation")
@run_imap_op
def export_conversation_endpoint(req: ExportConversationRequest):
imap = create_imap()
try:
login_imap(imap)
text = export_conversation(imap, req.folder, req.message_uid, req.max_messages)
return {"export": text}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
imap.logout()
@app.post("/conflict_check_search")
@run_imap_op
def conflict_check_search_endpoint(req: ConflictCheckSearchRequest):
imap = create_imap()
try:
login_imap(imap)
results = conflict_check_search(imap, req.terms, req.folders, req.max_results)
return {"results": results}
finally:
imap.logout()
@app.post("/get_triage_config")
def get_triage_config_endpoint(_req: GetTriageConfigRequest):
return {
"business_description": config.BUSINESS_DESCRIPTION,
"personnel": config.personnel,
}
# CardDAV endpoints
@app.post("/list_carddav_addressbooks")
def list_carddav_addressbooks_endpoint():
try:
abs_list = list_carddav_addressbooks()
return {"addressbooks": abs_list}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search_carddav_contacts")
def search_carddav_contacts_endpoint(req: SearchCardDAVContactsRequest):
try:
contacts = search_carddav_contacts(req.addressbook_href, req.query)
return {"contacts": contacts}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get_carddav_contact")
def get_carddav_contact_endpoint(req: GetCardDAVContactRequest):
try:
c = get_carddav_contact(req.href)
return c
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/create_carddav_contact")
def create_carddav_contact_endpoint(req: CreateCardDAVContactRequest):
try:
r = create_carddav_contact(req.addressbook_href, req.vcard)
return r
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/update_carddav_contact")
def update_carddav_contact_endpoint(req: UpdateCardDAVContactRequest):
try:
r = update_carddav_contact(req.href, req.vcard)
return r
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/delete_carddav_contact")
def delete_carddav_contact_endpoint(req: DeleteCardDAVContactRequest):
try:
r = delete_carddav_contact(req.href)
return r
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# CalDAV endpoints
@app.post("/list_caldav_calendars")
def list_caldav_calendars_endpoint():
try:
cal_list = list_caldav_calendars()
return {"calendars": cal_list}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search_caldav_events")
def search_caldav_events_endpoint(req: SearchCalDAVEventsRequest):
try:
events = search_caldav_events(req.calendar_href, req.start, req.end)
return {"events": events}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/create_caldav_event")
def create_caldav_event_endpoint(req: CreateCalDAVEventRequest):
try:
r = create_caldav_event(req.calendar_href, req.ical)
return r
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/update_caldav_event")
def update_caldav_event_endpoint(req: UpdateCalDAVEventRequest):
try:
r = update_caldav_event(req.href, req.ical)
return r
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/delete_caldav_event")
def delete_caldav_event_endpoint(req: DeleteCalDAVEventRequest):
try:
r = delete_caldav_event(req.href)
return r
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# -------------------- ENTRYPOINT --------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=False)