""" MCP Email Server - OpenAPI-compatible HTTP server Features: - IMAP (login) + SMTP (login) - List/search all folders (including shared/public) - New-message tracking and flagging - Triage guidance (business description + personnel/chain-of-command) - Extended tools: reply, forward, drafts, move, copy, labels, attachments, schedule_send, export_conversation - Conflict check search - CalDAV / CardDAV integration (e.g., Nextcloud) for calendar and contacts - Exposes OpenAPI 3.1 spec for integration with Open WebUI (OpenAPI tool server) Run: - uvicorn server:app --host 0.0.0.0 --port 8000 """ import json import os import time import base64 import email import email.policy import imaplib import smtplib import logging import threading import uuid import re import xml.etree.ElementTree as ET from typing import Any, Dict, List, Optional import requests from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("mcp-email-server") # -------------------- CONFIG -------------------- @dataclass class EmailConfig: # IMAP IMAP_HOST: str = os.getenv("IMAP_HOST", "imap.example.com") IMAP_PORT: int = int(os.getenv("IMAP_PORT", "993")) IMAP_USE_SSL: bool = os.getenv("IMAP_USE_SSL", "true").lower() in ("true", "1", "yes") IMAP_USERNAME: str = os.getenv("IMAP_USERNAME", "") IMAP_PASSWORD: str = os.getenv("IMAP_PASSWORD", "") # SMTP SMTP_HOST: str = os.getenv("SMTP_HOST", "smtp.example.com") SMTP_PORT: int = int(os.getenv("SMTP_PORT", "587")) SMTP_USE_TLS: bool = os.getenv("SMTP_USE_TLS", "true").lower() in ("true", "1", "yes") SMTP_USE_SSL: bool = os.getenv("SMTP_USE_SSL", "false").lower() in ("true", "1", "yes") SMTP_USERNAME: str = os.getenv("SMTP_USERNAME", "") SMTP_PASSWORD: str = os.getenv("SMTP_PASSWORD", "") SMTP_FROM: str = os.getenv("SMTP_FROM", "") # Optional cc/bcc defaults DEFAULT_CC: str = os.getenv("DEFAULT_CC", "") DEFAULT_BCC: str = os.getenv("DEFAULT_BCC", "") # Business/person description BUSINESS_DESCRIPTION: str = os.getenv("BUSINESS_DESCRIPTION", "") # Personnel / chain-of-command (JSON) PERSONNEL_JSON: str = os.getenv("PERSONNEL_JSON", "[]") # Scheduling SCHEDULED_SEND_INTERVAL: int = int(os.getenv("SCHEDULED_SEND_INTERVAL", "10")) # CardDAV / CalDAV (e.g., Nextcloud) DAV_BASE_URL: str = os.getenv("DAV_BASE_URL", "") # e.g. https://cloud.example.com/remote.php/dav DAV_USERNAME: str = os.getenv("DAV_USERNAME", "") DAV_PASSWORD: str = os.getenv("DAV_PASSWORD", "") DAV_VERIFY_TLS: bool = os.getenv("DAV_VERIFY_TLS", "true").lower() in ("true", "1", "yes") @property def personnel(self) -> List[Dict[str, Any]]: try: return json.loads(self.PERSONNEL_JSON) except Exception: return [] @property def default_cc_list(self) -> List[str]: return [e.strip() for e in self.DEFAULT_CC.split(",") if e.strip()] @property def default_bcc_list(self) -> List[str]: return [e.strip() for e in self.DEFAULT_BCC.split(",") if e.strip()] config = EmailConfig() # -------------------- IMAP HELPERS (UID-based) -------------------- def create_imap(): if config.IMAP_USE_SSL: return imaplib.IMAP4_SSL(config.IMAP_HOST, config.IMAP_PORT) else: m = imaplib.IMAP4(config.IMAP_HOST, config.IMAP_PORT) m.starttls() return m def login_imap(imap): imap.login(config.IMAP_USERNAME, config.IMAP_PASSWORD) def list_all_folders(imap): _, data = imap.list() folders = [] for line in data or []: if isinstance(line, bytes): line = line.decode("utf-8", errors="replace") parts = line.split('"') if len(parts) >= 4: name = parts[3] folders.append({"path": name}) return folders def ensure_selected(imap, folder: str): imap.select(folder, readonly=False) def parse_message(imap, uid: int): status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)") if status != "OK" or not msg_data or msg_data[0] is None: raise RuntimeError(f"Failed to fetch message UID {uid}") msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default) subject = msg.get("Subject", "") or "" from_header = msg.get("From", "") or "" to_header = msg.get("To", "") or "" cc_header = msg.get("Cc", "") or "" date_header = msg.get("Date", "") or "" message_id = msg.get("Message-ID", "") or "" in_reply_to = msg.get("In-Reply-To", "") or "" references = msg.get("References", "") or "" body_plain = "" body_html = "" if msg.is_multipart(): for part in msg.walk(): ptype = part.get_content_type() pdisp = (part.get("Content-Disposition") or "").lower() if "attachment" in pdisp: continue if ptype == "text/plain": body_plain += part.get_payload(decode=True).decode( part.get_content_charset() or "utf-8", errors="replace" ) elif ptype == "text/html": body_html += part.get_payload(decode=True).decode( part.get_content_charset() or "utf-8", errors="replace" ) else: ptype = msg.get_content_type() if ptype in ("text/plain", "text/html"): payload = msg.get_payload(decode=True).decode( msg.get_content_charset() or "utf-8", errors="replace" ) if ptype == "text/plain": body_plain = payload else: body_html = payload _, flags_data = imap.uid("FETCH", str(uid), "(FLAGS)") flags = [] if flags_data: line = flags_data[0].decode("utf-8", errors="replace") start = line.find("(") end = line.find(")") if start != -1 and end != -1: flags = [f.strip() for f in line[start+1:end].split() if f.strip()] return { "uid": uid, "subject": subject, "from": from_header, "to": to_header, "cc": cc_header, "date": date_header, "body_plain": body_plain, "body_html": body_html, "flags": flags, "message_id": message_id, "in_reply_to": in_reply_to, "references": references, } def search_messages(imap, folder: str, query: str, since: Optional[str], max_results: int): ensure_selected(imap, folder) criteria = [] if query: criteria.append(f'SUBJECT "{query}"') else: criteria.append("ALL") if since: criteria.append(f"SINCE {since}") search_str = " ".join(criteria) status, data = imap.uid("SEARCH", None, search_str) if status != "OK" or not data or not data[0]: return [] ids = (data[0].split() if data[0] else []) ids = ids[-max_results:] results = [] for uid_bytes in ids: uid = int(uid_bytes) try: m = parse_message(imap, uid) results.append(m) except Exception as e: logger.error(f"Error parsing message UID {uid}: {e}") return results def mark_as_read(imap, folder: str, uid: int): ensure_selected(imap, folder) imap.uid("STORE", str(uid), "+FLAGS", "\\Seen") def flag_message(imap, folder: str, uid: int): ensure_selected(imap, folder) imap.uid("STORE", str(uid), "+FLAGS", "\\Flagged") def move_message(imap, folder: str, uid: int, dest_folder: str): ensure_selected(imap, folder) imap.uid("COPY", str(uid), dest_folder) imap.uid("STORE", str(uid), "+FLAGS", "\\Deleted") imap.expunge() def copy_message(imap, folder: str, uid: int, dest_folder: str): ensure_selected(imap, folder) imap.uid("COPY", str(uid), dest_folder) def add_flags(imap, folder: str, uid: int, flags: List[str]): ensure_selected(imap, folder) imap.uid("STORE", str(uid), "+FLAGS", tuple(flags)) def remove_flags(imap, folder: str, uid: int, flags: List[str]): ensure_selected(imap, folder) imap.uid("STORE", str(uid), "-FLAGS", tuple(flags)) def get_labels(imap, folder: str, uid: int): _, flags_data = imap.uid("FETCH", str(uid), "(FLAGS)") if not flags_data: return [] line = flags_data[0].decode("utf-8", errors="replace") start = line.find("(") end = line.find(")") if start == -1 or end == -1: return [] flags = [f.strip() for f in line[start+1:end].split() if f.strip()] # Consider custom labels as flags starting with "$Label_" return [f for f in flags if f.startswith("$Label_")] def list_available_labels(imap): # Discover used labels by scanning a few folders (heuristic). labels = set() try: folders = list_all_folders(imap) for f in folders[:20]: # limit scan path = f["path"] try: ensure_selected(imap, path) status, data = imap.uid("SEARCH", None, "ALL") if status != "OK" or not data or not data[0]: continue ids = data[0].split() # sample first 20 UIDs for uid_bytes in ids[:20]: uid = int(uid_bytes) _, flags_data = imap.uid("FETCH", str(uid), "(FLAGS)") if not flags_data: continue line = flags_data[0].decode("utf-8", errors="replace") start = line.find("(") end = line.find(")") if start == -1 or end == -1: continue flags = [f.strip() for f in line[start+1:end].split() if f.strip()] for fl in flags: if fl.startswith("$Label_"): labels.add(fl.replace("$Label_", "", 1)) except Exception: continue except Exception: pass return sorted(labels) # -------------------- SMTP HELPERS -------------------- def send_email( to: List[str], subject: str, body: str, cc: Optional[List[str]] = None, bcc: Optional[List[str]] = None, html: bool = False, in_reply_to: Optional[str] = None, references: Optional[str] = None, ): if not config.SMTP_FROM: raise ValueError("SMTP_FROM not configured") if not cc: cc = [] if not bcc: bcc = [] cc = list(set(cc + config.default_cc_list)) bcc = list(set(bcc + config.default_bcc_list)) all_recipients = to + cc + bcc from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart if html: m = MIMEMultipart("alternative") m.attach(MIMEText(body, "html")) else: m = MIMEText(body, "plain") m["From"] = config.SMTP_FROM m["To"] = ", ".join(to) if cc: m["Cc"] = ", ".join(cc) m["Subject"] = subject m["Message-ID"] = f"<{uuid.uuid4().hex}@{config.SMTP_FROM.split('@')[1]}>" if in_reply_to: m["In-Reply-To"] = in_reply_to if references: m["References"] = references if config.SMTP_USE_SSL: s = smtplib.SMTP_SSL(config.SMTP_HOST, config.SMTP_PORT) else: s = smtplib.SMTP(config.SMTP_HOST, config.SMTP_PORT) if config.SMTP_USE_TLS: s.starttls() s.login(config.SMTP_USERNAME, config.SMTP_PASSWORD) s.sendmail(config.SMTP_FROM, all_recipients, m.as_string()) s.quit() # -------------------- NEW-MESSAGE TRACKING -------------------- last_seen_uids: Dict[str, set] = {} def get_new_messages(imap, folder: str): ensure_selected(imap, folder) status, data = imap.uid("SEARCH", None, "ALL") if status != "OK" or not data or not data[0]: return [] ids = data[0].split() uids = {int(u) for u in ids} prev = last_seen_uids.setdefault(folder, set()) new_uids = sorted(uids - prev) last_seen_uids[folder] = uids results = [] for uid in new_uids: m = parse_message(imap, uid) results.append(m) return results def sync_seen(imap, folder: str): ensure_selected(imap, folder) status, data = imap.uid("SEARCH", None, "ALL") if status != "OK" or not data or not data[0]: last_seen_uids[folder] = set() return ids = data[0].split() last_seen_uids[folder] = {int(u) for u in ids} # -------------------- TRIAGE HELPERS -------------------- def build_triage_hint(msg: Dict[str, Any]) -> str: if not config.BUSINESS_DESCRIPTION and not config.personnel: return "" hint_parts = [] if config.BUSINESS_DESCRIPTION: hint_parts.append(f"Context: {config.BUSINESS_DESCRIPTION}") from email.utils import parseaddr sender_email = parseaddr(msg.get("from", ""))[1].lower() personnel_by_email = { p["email"].lower(): p for p in config.personnel if p.get("email") } sender_info = personnel_by_email.get(sender_email) if sender_info: hint_parts.append( f"Sender is internal: {sender_info['name']} ({sender_info['role']})." ) if sender_info.get("escalates_to"): hint_parts.append( f"If escalation needed, escalate to: {sender_info['escalates_to']}." ) subject = (msg.get("subject") or "").lower() if any(w in subject for w in ["urgent", "asap", "escalat", "priority"]): hint_parts.append("Message appears urgent; consider escalation.") return " ".join(hint_parts) # -------------------- CONFLICT CHECK SEARCH -------------------- def conflict_check_search(imap, terms: List[str], folders: Optional[List[str]] = None, max_results: int = 100): """ Search emails across specified folders (or INBOX if not provided) for messages mentioning any of the given terms (names, firms, counsel). Useful for conflict checks. """ if not folders: folders = ["INBOX"] all_results = [] for folder in folders: try: ensure_selected(imap, folder) except Exception: continue for term in terms: try: status, data = imap.uid("SEARCH", None, f'SUBJECT "{term}"') if status != "OK" or not data or not data[0]: continue ids = data[0].split() for uid_bytes in ids[-max_results:]: uid = int(uid_bytes) msg = parse_message(imap, uid) all_results.append({ "folder": folder, "uid": uid, "subject": msg["subject"], "from": msg["from"], "to": msg["to"], "cc": msg["cc"], "date": msg["date"], "matched_term": term, "snippet": (msg["body_plain"] or "")[:400], }) except Exception as e: logger.error(f"Conflict search error for term '{term}' in {folder}: {e}") # Deduplicate by (folder, uid) seen = set() unique = [] for r in all_results: key = (r["folder"], r["uid"]) if key not in seen: seen.add(key) unique.append(r) return unique # -------------------- ATTACHMENTS -------------------- def list_attachments(imap, folder: str, uid: int) -> List[Dict[str, Any]]: ensure_selected(imap, folder) status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)") if status != "OK" or not msg_data or msg_data[0] is None: return [] msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default) attachments = [] for part in msg.walk(): pdisp = (part.get("Content-Disposition") or "").lower() if "attachment" not in pdisp: continue filename = part.get_filename() or "attachment" size = len(part.get_payload(decode=True) or b"") attachments.append({ "filename": filename, "content_type": part.get_content_type(), "size": size, }) return attachments def download_attachment(imap, folder: str, uid: int, filename: str) -> Dict[str, Any]: ensure_selected(imap, folder) status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)") if status != "OK" or not msg_data or msg_data[0] is None: raise ValueError("Failed to fetch message for attachment") msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default) for part in msg.walk(): pdisp = (part.get("Content-Disposition") or "").lower() if "attachment" not in pdisp: continue part_filename = part.get_filename() or "attachment" if part_filename != filename: continue payload = part.get_payload(decode=True) or b"" b64 = base64.b64encode(payload).decode("ascii") return { "filename": part_filename, "content_type": part.get_content_type(), "size": len(payload), "content_base64": b64, } raise ValueError(f"Attachment not found: {filename}") def search_attachments(imap, folder: str, file_pattern: str, max_results: int): # Search messages with attachments by filename pattern (case-insensitive). ensure_selected(imap, folder) status, data = imap.uid("SEARCH", None, "ALL") if status != "OK" or not data or not data[0]: return [] ids = data[0].split() ids = ids[-max_results * 5:] # sample more to find matches pattern = file_pattern.lower() results = [] for uid_bytes in ids: if len(results) >= max_results: break uid = int(uid_bytes) try: status, msg_data = imap.uid("FETCH", str(uid), "(RFC822)") if status != "OK" or not msg_data or msg_data[0] is None: continue msg = email.message_from_bytes(msg_data[0][1], policy=email.policy.default) for part in msg.walk(): pdisp = (part.get("Content-Disposition") or "").lower() if "attachment" not in pdisp: continue filename = (part.get_filename() or "").lower() if pattern and pattern not in filename: continue results.append({ "uid": uid, "subject": msg.get("Subject", ""), "from": msg.get("From", ""), "date": msg.get("Date", ""), "filename": part.get_filename() or "attachment", }) break # one match per message is enough except Exception as e: logger.error(f"Error scanning message UID {uid}: {e}") return results # -------------------- DRAFTS (IMAP Drafts folder) -------------------- def save_draft_to_imap(imap, folder: str, to: List[str], subject: str, body: str, html: bool = False): from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart m = MIMEMultipart() m["From"] = config.SMTP_FROM m["To"] = ", ".join(to) m["Subject"] = subject m["Message-ID"] = f"<{uuid.uuid4().hex}@{config.SMTP_FROM.split('@')[1]}>" m.attach(MIMEText(body, "html" if html else "plain")) ensure_selected(imap, folder) imap.append(folder, None, None, m.as_string().encode("utf-8")) return {"status": "draft_saved", "folder": folder} # -------------------- SCHEDULED SEND (in-memory) -------------------- scheduled_sends: Dict[str, Dict[str, Any]] = {} schedule_lock = threading.Lock() def add_scheduled_send(to, subject, body, cc, bcc, html, send_at, in_reply_to, references): task_id = uuid.uuid4().hex with schedule_lock: scheduled_sends[task_id] = { "to": to, "subject": subject, "body": body, "cc": cc, "bcc": bcc, "html": html, "send_at": float(send_at), "in_reply_to": in_reply_to, "references": references, } return task_id def scheduled_send_loop(): while True: now = time.time() with schedule_lock: ready = [tid for tid, t in scheduled_sends.items() if t["send_at"] <= now] for tid in ready: with schedule_lock: t = scheduled_sends.pop(tid, None) if not t: continue try: send_email( to=t["to"], subject=t["subject"], body=t["body"], cc=t["cc"], bcc=t["bcc"], html=t["html"], in_reply_to=t.get("in_reply_to"), references=t.get("references"), ) except Exception as e: logger.error(f"Scheduled send error for {tid}: {e}") time.sleep(config.SCHEDULED_SEND_INTERVAL) scheduler_thread = threading.Thread(target=scheduled_send_loop, daemon=True) scheduler_thread.start() # -------------------- EXPORT CONVERSATION -------------------- def export_conversation(imap, folder: str, message_uid: int, max_messages: int): # Basic: fetch messages with same subject and thread-like headers in same folder. ensure_selected(imap, folder) status, msg_data = imap.uid("FETCH", str(message_uid), "(RFC822)") if status != "OK" or not msg_data or msg_data[0] is None: return "Failed to fetch root message." root = email.message_from_bytes(msg_data[0][1], policy=email.policy.default) subject = (root.get("Subject", "") or "").strip() msg_id = (root.get("Message-ID", "") or "").strip() refs = set((root.get("References", "") or "").split()) in_reply = (root.get("In-Reply-To", "") or "").strip() if in_reply: refs.add(in_reply) status, data = imap.uid("SEARCH", None, f'SUBJECT "{subject}"') if status != "OK" or not data or not data[0]: ids = [] else: ids = data[0].split() thread_uids = [] for uid_bytes in ids: uid = int(uid_bytes) if len(thread_uids) >= max_messages: break status, mdata = imap.uid("FETCH", str(uid), "(RFC822)") if status != "OK" or not mdata or mdata[0] is None: continue m = email.message_from_bytes(mdata[0][1], policy=email.policy.default) mid = (m.get("Message-ID", "") or "").strip() mrefs = set((m.get("References", "") or "").split()) mirt = (m.get("In-Reply-To", "") or "").strip() if uid == message_uid or mid == msg_id: thread_uids.append(uid) elif refs and mrefs & refs: thread_uids.append(uid) elif mirt and (mirt == msg_id or mirt in refs): thread_uids.append(uid) elif mid and (mid in refs or mid == in_reply): thread_uids.append(uid) lines = [] for uid in thread_uids: try: msg = parse_message(imap, uid) except Exception: continue lines.append(f"From: {msg['from']}") lines.append(f"To: {msg['to']}") lines.append(f"Date: {msg['date']}") lines.append(f"Subject: {msg['subject']}") lines.append("") lines.append(msg["body_plain"].strip()) lines.append("\n---\n") return "\n".join(lines) # -------------------- CARDdav / CALdav HELPERS -------------------- DAV_NS = {"D": "DAV:"} def dav_session(): if not config.DAV_BASE_URL or not config.DAV_USERNAME or not config.DAV_PASSWORD: raise ValueError("DAV_BASE_URL, DAV_USERNAME, or DAV_PASSWORD not configured") s = requests.Session() s.auth = (config.DAV_USERNAME, config.DAV_PASSWORD) s.verify = config.DAV_VERIFY_TLS return s def dav_request(s, method, url, data=None, headers=None, timeout=20): try: r = s.request(method, url, data=data, headers=headers or {}, timeout=timeout) return r except Exception as e: logger.error(f"DAV request error: {method} {url}: {e}") raise def parse_dav_propfind(xml_text): """ Parse a PROPFIND response and return list of {href, props}. props is a dict of {local-name: text}. """ try: root = ET.fromstring(xml_text) except ET.ParseError: return [] results = [] for response in root.findall(".//D:response", DAV_NS): href_el = response.find("D:href", DAV_NS) href = (href_el.text or "").strip() if href_el is not None else "" props = {} propfind = response.find("D:propstat/D:prop", DAV_NS) if propfind is not None: for child in propfind: tag = re.sub(r".*\{.*\}", "", child.tag) text = (child.text or "").strip() props[tag] = text if href: results.append({"href": href, "props": props}) return results # CardDAV helpers def carddav_base_url(): return config.DAV_BASE_URL.rstrip("/") + "/cards/" def list_carddav_addressbooks(): s = dav_session() r = dav_request(s, "PROPFIND", carddav_base_url(), headers={"Depth": "1", "Content-Type": "application/xml"}, data=""" """) r.raise_for_status() entries = parse_dav_propfind(r.text) addressbooks = [] for e in entries: href = e["href"] props = e.get("props", {}) # Check resource type for addressbook rt = props.get("resourcetype", "") if "/cards/" in href and href.endswith("/") and ("addressbook" in rt.lower() or "addressbook" in href.lower()): addressbooks.append({ "href": href, "name": props.get("displayname", href), }) return addressbooks def _xml_escape(s: str) -> str: s = s.replace("&", "&") s = s.replace("<", "<") s = s.replace(">", ">") s = s.replace('"', """) s = s.replace("'", "'") return s def search_carddav_contacts(addressbook_href: str, query: str): s = dav_session() safe_query = _xml_escape(query) req_body = f""" {safe_query} """ r = dav_request(s, "REPORT", addressbook_href, headers={"Content-Type": "application/xml"}, data=req_body) r.raise_for_status() hrefs = re.findall(r'(.*?)', r.text, re.DOTALL) contacts = [] for href in hrefs: href = href.strip() if not href: continue try: vr = dav_request(s, "GET", href) vr.raise_for_status() contacts.append({ "href": href, "vcard": vr.text, }) except Exception as e: logger.error(f"Error fetching vCard {href}: {e}") return contacts def get_carddav_contact(href: str): s = dav_session() r = dav_request(s, "GET", href) r.raise_for_status() return {"href": href, "vcard": r.text} def create_carddav_contact(addressbook_href: str, vcard: str): s = dav_session() filename = f"{uuid.uuid4().hex}.vcf" url = addressbook_href.rstrip("/") + "/" + filename r = dav_request(s, "PUT", url, data=vcard.encode("utf-8"), headers={"Content-Type": "text/vcard"}) r.raise_for_status() return {"href": url} def update_carddav_contact(href: str, vcard: str): s = dav_session() r = dav_request(s, "PUT", href, data=vcard.encode("utf-8"), headers={"Content-Type": "text/vcard"}) r.raise_for_status() return {"href": href} def delete_carddav_contact(href: str): s = dav_session() r = dav_request(s, "DELETE", href) r.raise_for_status() return {"href": href} # CalDAV helpers def caldav_base_url(): return config.DAV_BASE_URL.rstrip("/") + "/calendars/" def list_caldav_calendars(): s = dav_session() r = dav_request(s, "PROPFIND", caldav_base_url(), headers={"Depth": "1", "Content-Type": "application/xml"}, data=""" """) r.raise_for_status() entries = parse_dav_propfind(r.text) calendars = [] for e in entries: href = e["href"] props = e.get("props", {}) rt = props.get("resourcetype", "") if "/calendars/" in href and href.endswith("/") and ("calendar" in rt.lower() or "calendar" in href.lower()): calendars.append({ "href": href, "name": props.get("displayname", href.split("/")[-2]), }) return calendars def search_caldav_events(calendar_href: str, start: str, end: str): # start/end as ISO8601, e.g. "20251201T000000Z" s = dav_session() req_body = f""" """ r = dav_request(s, "REPORT", calendar_href, headers={"Content-Type": "application/xml"}, data=req_body) r.raise_for_status() hrefs = re.findall(r'(.*?)', r.text, re.DOTALL) events = [] for href in hrefs: href = href.strip() if not href: continue try: vr = dav_request(s, "GET", href) vr.raise_for_status() events.append({ "href": href, "ical": vr.text, }) except Exception as e: logger.error(f"Error fetching event {href}: {e}") return events def create_caldav_event(calendar_href: str, ical: str): s = dav_session() filename = f"{uuid.uuid4().hex}.ics" url = calendar_href.rstrip("/") + "/" + filename r = dav_request(s, "PUT", url, data=ical.encode("utf-8"), headers={"Content-Type": "text/calendar"}) r.raise_for_status() return {"href": url} def update_caldav_event(href: str, ical: str): s = dav_session() r = dav_request(s, "PUT", href, data=ical.encode("utf-8"), headers={"Content-Type": "text/calendar"}) r.raise_for_status() return {"href": href} def delete_caldav_event(href: str): s = dav_session() r = dav_request(s, "DELETE", href) r.raise_for_status() return {"href": href} # -------------------- FASTAPI / OPENAPI ENDPOINTS -------------------- app = FastAPI( title="MCP Email Server", description="Email assistant MCP server exposing IMAP/SMTP, CalDAV/CardDAV, and conflict-check operations via OpenAPI.", version="1.0.0", ) # Request models class ListFoldersRequest(BaseModel): pass class SearchMessagesRequest(BaseModel): folder: str = Field(default="INBOX", description="IMAP folder to search") query: str = Field(default="", description="Search query (subject)") since: Optional[str] = Field( default=None, description="Optional IMAP SINCE date, e.g. '01-Jan-2024'", ) max_results: int = Field(default=50, description="Max results to return") class GetNewMessagesRequest(BaseModel): folder: str = Field(default="INBOX", description="IMAP folder to check") class SyncSeenRequest(BaseModel): folder: str = Field(default="INBOX", description="IMAP folder") class MarkAsReadRequest(BaseModel): folder: str uid: int class FlagMessageRequest(BaseModel): folder: str uid: int class MoveMessageRequest(BaseModel): folder: str uid: int dest_folder: str class CopyMessageRequest(BaseModel): folder: str uid: int dest_folder: str class ManageLabelRequest(BaseModel): folder: str uid: int label: str # without "$Label_" prefix; we add it internally class ListLabelsRequest(BaseModel): folder: str = Field(default="INBOX", description="IMAP folder") uid: Optional[int] = Field(default=None, description="If set, list labels for that message; else list used labels in folder.") class SendEmailRequest(BaseModel): to: List[str] subject: str body: str cc: Optional[List[str]] = Field(default=None) bcc: Optional[List[str]] = Field(default=None) html: bool = Field(default=False, description="Treat body as HTML if true") in_reply_to: Optional[str] = Field(default=None, description="In-Reply-To header for replies") references: Optional[str] = Field(default=None, description="References header for thread continuity") class ReplyToMessageRequest(BaseModel): folder: str uid: int body: str html: bool = Field(default=False) to: Optional[List[str]] = Field(default=None, description="Override reply recipients") cc: Optional[List[str]] = Field(default=None) bcc: Optional[List[str]] = Field(default=None) class ForwardMessageRequest(BaseModel): folder: str uid: int to: List[str] cc: Optional[List[str]] = Field(default=None) bcc: Optional[List[str]] = Field(default=None) note: str = Field(default="", description="Optional note to prepend") html: bool = Field(default=False) class SaveDraftRequest(BaseModel): to: List[str] subject: str body: str html: bool = Field(default=False) folder: str = Field(default="Drafts", description="IMAP folder to save draft") class GetUnreadSummaryRequest(BaseModel): folder: str = Field(default="INBOX") class ListAttachmentsRequest(BaseModel): folder: str uid: int class DownloadAttachmentRequest(BaseModel): folder: str uid: int filename: str class SearchAttachmentsRequest(BaseModel): folder: str = Field(default="INBOX") file_pattern: str = Field(default="", description="Filename substring to match (case-insensitive)") max_results: int = Field(default=50) class ScheduleSendRequest(BaseModel): to: List[str] subject: str body: str cc: Optional[List[str]] = Field(default=None) bcc: Optional[List[str]] = Field(default=None) html: bool = Field(default=False) send_at: float = Field(description="Unix timestamp when to send") in_reply_to: Optional[str] = Field(default=None) references: Optional[str] = Field(default=None) class ExportConversationRequest(BaseModel): folder: str message_uid: int max_messages: int = Field(default=50) class ConflictCheckSearchRequest(BaseModel): terms: List[str] = Field(description="Terms to search (names, firms, counsel).") folders: Optional[List[str]] = Field(default=None, description="Folders to search; defaults to INBOX.") max_results: int = Field(default=100) class GetTriageConfigRequest(BaseModel): pass class SearchCardDAVContactsRequest(BaseModel): addressbook_href: str query: str class GetCardDAVContactRequest(BaseModel): href: str class CreateCardDAVContactRequest(BaseModel): addressbook_href: str vcard: str class UpdateCardDAVContactRequest(BaseModel): href: str vcard: str class DeleteCardDAVContactRequest(BaseModel): href: str class SearchCalDAVEventsRequest(BaseModel): calendar_href: str start: str = Field(description="ISO8601 start, e.g. 20251201T000000Z") end: str = Field(description="ISO8601 end, e.g. 20251231T235959Z") class CreateCalDAVEventRequest(BaseModel): calendar_href: str ical: str class UpdateCalDAVEventRequest(BaseModel): href: str ical: str class DeleteCalDAVEventRequest(BaseModel): href: str # Helper to run IMAP operations in a thread-safe way (FastAPI is async) import concurrent.futures executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) def run_imap_op(fn): def wrapper(*args, **kwargs): return executor.submit(fn, *args, **kwargs).result() return wrapper @app.post("/list_folders") @run_imap_op def list_folders(_req: ListFoldersRequest): imap = create_imap() try: login_imap(imap) folders = list_all_folders(imap) return {"folders": folders} finally: imap.logout() @app.post("/search_messages") @run_imap_op def search_messages_endpoint(req: SearchMessagesRequest): imap = create_imap() try: login_imap(imap) results = search_messages( imap, folder=req.folder, query=req.query, since=req.since, max_results=req.max_results, ) return {"messages": results} finally: imap.logout() @app.post("/get_new_messages") @run_imap_op def get_new_messages_endpoint(req: GetNewMessagesRequest): imap = create_imap() try: login_imap(imap) new = get_new_messages(imap, req.folder) out = [] for m in new: m["triage_hint"] = build_triage_hint(m) out.append(m) return {"messages": out} finally: imap.logout() @app.post("/sync_seen") @run_imap_op def sync_seen_endpoint(req: SyncSeenRequest): imap = create_imap() try: login_imap(imap) sync_seen(imap, req.folder) return {"status": "ok", "folder": req.folder} finally: imap.logout() @app.post("/mark_as_read") @run_imap_op def mark_as_read_endpoint(req: MarkAsReadRequest): imap = create_imap() try: login_imap(imap) mark_as_read(imap, req.folder, req.uid) return {"status": "ok"} finally: imap.logout() @app.post("/flag_message") @run_imap_op def flag_message_endpoint(req: FlagMessageRequest): imap = create_imap() try: login_imap(imap) flag_message(imap, req.folder, req.uid) return {"status": "ok"} finally: imap.logout() @app.post("/move_message") @run_imap_op def move_message_endpoint(req: MoveMessageRequest): imap = create_imap() try: login_imap(imap) move_message(imap, req.folder, req.uid, req.dest_folder) return {"status": "moved", "dest_folder": req.dest_folder} finally: imap.logout() @app.post("/copy_message") @run_imap_op def copy_message_endpoint(req: CopyMessageRequest): imap = create_imap() try: login_imap(imap) copy_message(imap, req.folder, req.uid, req.dest_folder) return {"status": "copied", "dest_folder": req.dest_folder} finally: imap.logout() @app.post("/apply_label") @run_imap_op def apply_label_endpoint(req: ManageLabelRequest): imap = create_imap() try: login_imap(imap) add_flags(imap, req.folder, req.uid, [f"$Label_{req.label}"]) return {"status": "applied", "label": req.label} finally: imap.logout() @app.post("/remove_label") @run_imap_op def remove_label_endpoint(req: ManageLabelRequest): imap = create_imap() try: login_imap(imap) remove_flags(imap, req.folder, req.uid, [f"$Label_{req.label}"]) return {"status": "removed", "label": req.label} finally: imap.logout() @app.post("/list_labels") @run_imap_op def list_labels_endpoint(req: ListLabelsRequest): imap = create_imap() try: login_imap(imap) if req.uid: labels = get_labels(imap, req.folder, req.uid) return {"labels": [l.replace("$Label_", "", 1) for l in labels]} else: labels = list_available_labels(imap) return {"labels": labels} finally: imap.logout() @app.post("/get_unread_summary") @run_imap_op def get_unread_summary_endpoint(req: GetUnreadSummaryRequest): imap = create_imap() try: login_imap(imap) ensure_selected(imap, req.folder) status, data = imap.uid("SEARCH", None, "UNSEEN") if status != "OK" or not data or not data[0]: ids = [] else: ids = data[0].split() uids = [int(u) for u in ids] count = len(uids) # Provide short previews for up to 20 previews = [] for uid in uids[:20]: msg = parse_message(imap, uid) previews.append({ "uid": uid, "subject": msg["subject"], "from": msg["from"], "date": msg["date"], "snippet": (msg["body_plain"] or "")[:300], }) return { "folder": req.folder, "unread_count": count, "previews": previews, } finally: imap.logout() @app.post("/send_email") def send_email_endpoint(req: SendEmailRequest): try: send_email( to=req.to, subject=req.subject, body=req.body, cc=req.cc, bcc=req.bcc, html=req.html, in_reply_to=req.in_reply_to, references=req.references, ) return {"status": "sent"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/reply_to_message") @run_imap_op def reply_to_message_endpoint(req: ReplyToMessageRequest): imap = create_imap() try: login_imap(imap) msg = parse_message(imap, req.folder, req.uid) # Determine reply recipients from email.utils import getaddresses if req.to: reply_to_list = req.to else: # default: reply to "To" and "From" addrs = [] for _, addr in getaddresses([msg["to"], msg["from"]]): if addr: addrs.append(addr) reply_to_list = addrs subject = msg["subject"] if not subject.lower().startswith("re:"): subject = f"Re: {subject}" # Thread headers in_reply_to = msg.get("message_id") or None refs = (msg.get("references") or "").strip() if in_reply_to and refs: refs = f"{refs} {in_reply_to}" elif in_reply_to: refs = in_reply_to send_email( to=reply_to_list, subject=subject, body=req.body, cc=req.cc, bcc=req.bcc, html=req.html, in_reply_to=in_reply_to, references=refs, ) return {"status": "replied"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/forward_message") @run_imap_op def forward_message_endpoint(req: ForwardMessageRequest): imap = create_imap() try: login_imap(imap) msg = parse_message(imap, req.folder, req.uid) subject = msg["subject"] if not subject.lower().startswith("fwd:"): subject = f"Fwd: {subject}" body = req.body or "" if req.note: body = f"{req.note}\n\n" + body # Include original message as quoted text original = msg["body_plain"] or (msg["body_html"] or "") if original: quoted = "\n".join("> " + line for line in original.splitlines()) body = (body + "\n\n" + quoted).strip() send_email( to=req.to, subject=subject, body=body, cc=req.cc, bcc=req.bcc, html=req.html, ) return {"status": "forwarded"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/save_draft") @run_imap_op def save_draft_endpoint(req: SaveDraftRequest): imap = create_imap() try: login_imap(imap) return save_draft_to_imap(imap, req.folder, req.to, req.subject, req.body, req.html) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/list_attachments") @run_imap_op def list_attachments_endpoint(req: ListAttachmentsRequest): imap = create_imap() try: login_imap(imap) atts = list_attachments(imap, req.folder, req.uid) return {"attachments": atts} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/download_attachment") @run_imap_op def download_attachment_endpoint(req: DownloadAttachmentRequest): imap = create_imap() try: login_imap(imap) info = download_attachment(imap, req.folder, req.uid, req.filename) return info except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/search_attachments") @run_imap_op def search_attachments_endpoint(req: SearchAttachmentsRequest): imap = create_imap() try: login_imap(imap) results = search_attachments(imap, req.folder, req.file_pattern, req.max_results) return {"results": results} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/schedule_send") def schedule_send_endpoint(req: ScheduleSendRequest): task_id = add_scheduled_send( to=req.to, subject=req.subject, body=req.body, cc=req.cc, bcc=req.bcc, html=req.html, send_at=req.send_at, in_reply_to=req.in_reply_to, references=req.references, ) return {"status": "scheduled", "task_id": task_id} @app.post("/export_conversation") @run_imap_op def export_conversation_endpoint(req: ExportConversationRequest): imap = create_imap() try: login_imap(imap) text = export_conversation(imap, req.folder, req.message_uid, req.max_messages) return {"export": text} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: imap.logout() @app.post("/conflict_check_search") @run_imap_op def conflict_check_search_endpoint(req: ConflictCheckSearchRequest): imap = create_imap() try: login_imap(imap) results = conflict_check_search(imap, req.terms, req.folders, req.max_results) return {"results": results} finally: imap.logout() @app.post("/get_triage_config") def get_triage_config_endpoint(_req: GetTriageConfigRequest): return { "business_description": config.BUSINESS_DESCRIPTION, "personnel": config.personnel, } # CardDAV endpoints @app.post("/list_carddav_addressbooks") def list_carddav_addressbooks_endpoint(): try: abs_list = list_carddav_addressbooks() return {"addressbooks": abs_list} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/search_carddav_contacts") def search_carddav_contacts_endpoint(req: SearchCardDAVContactsRequest): try: contacts = search_carddav_contacts(req.addressbook_href, req.query) return {"contacts": contacts} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/get_carddav_contact") def get_carddav_contact_endpoint(req: GetCardDAVContactRequest): try: c = get_carddav_contact(req.href) return c except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/create_carddav_contact") def create_carddav_contact_endpoint(req: CreateCardDAVContactRequest): try: r = create_carddav_contact(req.addressbook_href, req.vcard) return r except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/update_carddav_contact") def update_carddav_contact_endpoint(req: UpdateCardDAVContactRequest): try: r = update_carddav_contact(req.href, req.vcard) return r except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/delete_carddav_contact") def delete_carddav_contact_endpoint(req: DeleteCardDAVContactRequest): try: r = delete_carddav_contact(req.href) return r except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # CalDAV endpoints @app.post("/list_caldav_calendars") def list_caldav_calendars_endpoint(): try: cal_list = list_caldav_calendars() return {"calendars": cal_list} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/search_caldav_events") def search_caldav_events_endpoint(req: SearchCalDAVEventsRequest): try: events = search_caldav_events(req.calendar_href, req.start, req.end) return {"events": events} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/create_caldav_event") def create_caldav_event_endpoint(req: CreateCalDAVEventRequest): try: r = create_caldav_event(req.calendar_href, req.ical) return r except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/update_caldav_event") def update_caldav_event_endpoint(req: UpdateCalDAVEventRequest): try: r = update_caldav_event(req.href, req.ical) return r except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/delete_caldav_event") def delete_caldav_event_endpoint(req: DeleteCalDAVEventRequest): try: r = delete_caldav_event(req.href) return r except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # -------------------- ENTRYPOINT -------------------- if __name__ == "__main__": import uvicorn uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=False)