YAMAGATA STUDIO
ブログ一覧に戻る
Agent Development KitでGmail用エージェントを作成する

Agent Development KitでGmail用エージェントを作成する

2025年05月20日
ADKGmail

ClineでとりあえずGmailエージェント動かすところまで作ったメモ。 全部の機能は試していないが一旦特定の件名を含むメールを抽出し要約するところまではできた。

リファクタリング一切してない前提(他のコードコピーしていじってるので不要な部分が結構あり)。 コーディネータから呼び出される前提。

langchainのコミュニティライブラリ使うと楽そうだったがまだバグありっぽいのでツール実装する。

https://github.com/google/adk-python/issues/310

事前準備

Gmail用のOAuthクライアントキーファイルやトークンファイルは事前に取得しておく(Web検索すると色々出てくる)。

トークン取得する際はCloudShellからだとできないので、ローカルPCで取得しておく。

Gmail tool用のプログラム

import os
import sys
import asyncio
import base64
from typing import Dict, Any, List, Optional
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dotenv import load_dotenv
import googleapiclient.discovery
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import json # pickleの代わりにjsonをインポート

# 環境変数の読み込み
load_dotenv()
parent_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(parent_dir)
sys.path.append(parent_dir)
sys.path.append(root_dir)
from shared_libraries import constants

# 権限スコープの設定
SCOPES = ['https://mail.google.com/']

# gmail setting
current_script_dir = os.path.dirname(os.path.abspath(__file__))
KEY_PATH_RELATIVE = constants.CREDENTIAL_PATH 
KEY_PATH_ABSOLUTE = os.path.abspath(os.path.join(current_script_dir, KEY_PATH_RELATIVE))
TOKEN_PATH_RELATIVE = constants.GCP_TOKEN_PATH
TOKEN_PATH_ABSOLUTE = os.path.abspath(os.path.join(current_script_dir, TOKEN_PATH_RELATIVE))

# グローバル変数
gmail_service = None
gmail_profile = None

def get_gmail_service():
    """Gmail APIサービスを取得する"""
    global gmail_service
    
    if gmail_service:
        return gmail_service
    
    # 認証情報の取得
    creds = None
    token_path = TOKEN_PATH_ABSOLUTE
    credentials_path = KEY_PATH_ABSOLUTE
    
    # トークンファイルが存在する場合は読み込み
    if os.path.exists(token_path):
        # creds = pickle.load(token) を Credentials.from_authorized_user_file に変更
        try:
            creds = Credentials.from_authorized_user_file(token_path, SCOPES)
        except json.JSONDecodeError:
            # JSON形式でない場合や破損している場合はNoneにする
            creds = None
        except ValueError:
             # スコープが一致しない場合などもNoneにする
            creds = None

    # 認証情報が無効または期限切れの場合は更新
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            if not os.path.exists(credentials_path):
                return {
                    "status": "error",
                    "error_message": f"credentials.jsonファイルが見つかりません: {credentials_path}"
                }
            
            flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES)
            creds = flow.run_local_server(port=0)

        # トークンを保存 (JSON形式で保存)
        with open(token_path, 'w') as token:
            token.write(creds.to_json())

    # Gmailサービスを構築
    gmail_service = googleapiclient.discovery.build('gmail', 'v1', credentials=creds)
    
    return gmail_service

async def initialize_gmail_client():
    """Gmailクライアントを初期化して接続する"""
    global gmail_service, gmail_profile
    
    try:
        # 非同期でGmailサービスを初期化
        loop = asyncio.get_event_loop()
        gmail_service = await loop.run_in_executor(None, get_gmail_service)
        
        if isinstance(gmail_service, dict) and gmail_service.get("status") == "error":
            return gmail_service
        
        # プロフィール情報を取得
        profile_result = await loop.run_in_executor(
            None, 
            lambda: gmail_service.users().getProfile(userId='me').execute()
        )
        
        gmail_profile = profile_result
        
        return {
            "status": "success",
            "message": "Gmailクライアントが初期化されました",
            "email": profile_result.get('emailAddress', '')
        }
    
    except Exception as e:
        return {
            "status": "error",
            "error_message": f"Gmailクライアントの初期化中にエラーが発生しました: {str(e)}"
        }

async def search_emails(query: str, max_results: int = 10) -> Dict[str, Any]:
    """
    Gmailでメールを検索する
    
    Args:
        query (str): 検索クエリ (Gmail検索構文に従う)
        max_results (int): 取得する最大結果数
        
    Returns:
        Dict[str, Any]: 検索結果と状態を含む辞書
    """
    if not gmail_service:
        return {
            "status": "error",
            "error_message": "Gmailクライアントが初期化されていません。先にgmail_init()を実行してください。"
        }
    
    try:
        loop = asyncio.get_event_loop()
        
        # メール一覧を検索
        search_result = await loop.run_in_executor(
            None,
            lambda: gmail_service.users().messages().list(
                userId='me', 
                q=query,
                maxResults=max_results
            ).execute()
        )
        
        # 検索結果がない場合
        if 'messages' not in search_result:
            return {
                "status": "success",
                "message_count": 0,
                "messages": []
            }
        
        messages = []
        
        # 各メッセージの詳細を取得
        for msg in search_result['messages']:
            msg_detail = await loop.run_in_executor(
                None,
                lambda: gmail_service.users().messages().get(
                    userId='me', 
                    id=msg['id'],
                    format='metadata',
                    metadataHeaders=['From', 'To', 'Subject', 'Date']
                ).execute()
            )
            
            # ヘッダー情報を抽出
            headers = msg_detail.get('payload', {}).get('headers', [])
            email_data = {
                "id": msg['id'],
                "thread_id": msg_detail.get('threadId', ''),
                "snippet": msg_detail.get('snippet', ''),
                "date": '',
                "from": '',
                "to": '',
                "subject": ''
            }
            
            for header in headers:
                if header['name'] == 'From':
                    email_data['from'] = header['value']
                elif header['name'] == 'To':
                    email_data['to'] = header['value']
                elif header['name'] == 'Subject':
                    email_data['subject'] = header['value']
                elif header['name'] == 'Date':
                    email_data['date'] = header['value']
            
            messages.append(email_data)
        
        return {
            "status": "success",
            "message_count": len(messages),
            "messages": messages
        }
    
    except Exception as e:
        return {
            "status": "error",
            "error_message": f"メールの検索中にエラーが発生しました: {str(e)}"
        }

async def get_email_content(email_id: str) -> Dict[str, Any]:
    """
    メールの内容を取得する
    
    Args:
        email_id (str): メールID
        
    Returns:
        Dict[str, Any]: メール内容と状態を含む辞書
    """
    if not gmail_service:
        return {
            "status": "error",
            "error_message": "Gmailクライアントが初期化されていません。先にgmail_init()を実行してください。"
        }
    
    try:
        loop = asyncio.get_event_loop()
        
        # メールの詳細を取得
        msg = await loop.run_in_executor(
            None,
            lambda: gmail_service.users().messages().get(
                userId='me', 
                id=email_id,
                format='full'
            ).execute()
        )
        
        # ヘッダー情報を抽出
        headers = msg.get('payload', {}).get('headers', [])
        email_data = {
            "id": email_id,
            "thread_id": msg.get('threadId', ''),
            "date": '',
            "from": '',
            "to": '',
            "cc": '',
            "subject": '',
            "body_text": '',
            "body_html": ''
        }
        
        for header in headers:
            if header['name'] == 'From':
                email_data['from'] = header['value']
            elif header['name'] == 'To':
                email_data['to'] = header['value']
            elif header['name'] == 'Cc':
                email_data['cc'] = header['value']
            elif header['name'] == 'Subject':
                email_data['subject'] = header['value']
            elif header['name'] == 'Date':
                email_data['date'] = header['value']
        
        # メール本文を取得
        email_data['body_text'] = _get_message_text(msg)
        email_data['body_html'] = _get_message_html(msg)
        
        return {
            "status": "success",
            "email": email_data
        }
    
    except Exception as e:
        return {
            "status": "error",
            "error_message": f"メール内容の取得中にエラーが発生しました: {str(e)}"
        }

async def create_draft(to: str, subject: str, body: str, cc: str = "", bcc: str = "") -> Dict[str, Any]:
    """
    メールのドラフトを作成する
    
    Args:
        to (str): 宛先メールアドレス (複数の場合はカンマ区切り)
        subject (str): 件名
        body (str): 本文
        cc (str, optional): CCアドレス (複数の場合はカンマ区切り)
        bcc (str, optional): BCCアドレス (複数の場合はカンマ区切り)
        
    Returns:
        Dict[str, Any]: 作成結果と状態を含む辞書
    """
    if not gmail_service:
        return {
            "status": "error",
            "error_message": "Gmailクライアントが初期化されていません。先にgmail_init()を実行してください。"
        }
    
    try:
        # MIMEメッセージを作成
        message = MIMEMultipart()
        message['to'] = to
        message['subject'] = subject
        
        if cc:
            message['cc'] = cc
        if bcc:
            message['bcc'] = bcc
        
        # 本文を追加
        msg = MIMEText(body)
        message.attach(msg)
        
        # メッセージをエンコード
        encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
        
        loop = asyncio.get_event_loop()
        
        # ドラフトを作成
        draft = await loop.run_in_executor(
            None,
            lambda: gmail_service.users().drafts().create(
                userId='me',
                body={'message': {'raw': encoded_message}}
            ).execute()
        )
        
        return {
            "status": "success",
            "message": "ドラフトが作成されました",
            "draft_id": draft.get('id', ''),
            "to": to,
            "subject": subject
        }
    
    except Exception as e:
        return {
            "status": "error",
            "error_message": f"ドラフト作成中にエラーが発生しました: {str(e)}"
        }

async def send_email(to: str, subject: str, body: str, cc: str = "", bcc: str = "") -> Dict[str, Any]:
    """
    メールを送信する
    
    Args:
        to (str): 宛先メールアドレス (複数の場合はカンマ区切り)
        subject (str): 件名
        body (str): 本文
        cc (str, optional): CCアドレス (複数の場合はカンマ区切り)
        bcc (str, optional): BCCアドレス (複数の場合はカンマ区切り)
        
    Returns:
        Dict[str, Any]: 送信結果と状態を含む辞書
    """
    if not gmail_service:
        return {
            "status": "error",
            "error_message": "Gmailクライアントが初期化されていません。先にgmail_init()を実行してください。"
        }
    
    try:
        # MIMEメッセージを作成
        message = MIMEMultipart()
        message['to'] = to
        message['subject'] = subject
        
        if cc:
            message['cc'] = cc
        if bcc:
            message['bcc'] = bcc
        
        # 本文を追加
        msg = MIMEText(body)
        message.attach(msg)
        
        # メッセージをエンコード
        encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
        
        loop = asyncio.get_event_loop()
        
        # メールを送信
        sent_message = await loop.run_in_executor(
            None,
            lambda: gmail_service.users().messages().send(
                userId='me',
                body={'raw': encoded_message}
            ).execute()
        )
        
        return {
            "status": "success",
            "message": "メールが送信されました",
            "message_id": sent_message.get('id', ''),
            "to": to,
            "subject": subject
        }
    
    except Exception as e:
        return {
            "status": "error",
            "error_message": f"メール送信中にエラーが発生しました: {str(e)}"
        }

def _get_message_text(message: Dict[str, Any]) -> str:
    """メッセージからテキスト形式の本文を抽出する"""
    if 'payload' not in message:
        return ""
    
    # メッセージが複数パートの場合
    if 'parts' in message['payload']:
        for part in message['payload']['parts']:
            # メッセージパートがテキスト形式の場合
            if part['mimeType'] == 'text/plain':
                if 'data' in part['body']:
                    return base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
            # ネストされたパートがある場合は再帰的に探索
            elif 'parts' in part:
                for sub_part in part['parts']:
                    if sub_part['mimeType'] == 'text/plain':
                        if 'data' in sub_part['body']:
                            return base64.urlsafe_b64decode(sub_part['body']['data']).decode('utf-8')
    
    # メッセージが単一パートの場合
    elif message['payload']['mimeType'] == 'text/plain':
        if 'data' in message['payload']['body']:
            return base64.urlsafe_b64decode(message['payload']['body']['data']).decode('utf-8')
    
    return ""

def _get_message_html(message: Dict[str, Any]) -> str:
    """メッセージからHTML形式の本文を抽出する"""
    if 'payload' not in message:
        return ""
    
    # メッセージが複数パートの場合
    if 'parts' in message['payload']:
        for part in message['payload']['parts']:
            # メッセージパートがHTML形式の場合
            if part['mimeType'] == 'text/html':
                if 'data' in part['body']:
                    return base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
            # ネストされたパートがある場合は再帰的に探索
            elif 'parts' in part:
                for sub_part in part['parts']:
                    if sub_part['mimeType'] == 'text/html':
                        if 'data' in sub_part['body']:
                            return base64.urlsafe_b64decode(sub_part['body']['data']).decode('utf-8')
    
    # メッセージが単一パートの場合
    elif message['payload']['mimeType'] == 'text/html':
        if 'data' in message['payload']['body']:
            return base64.urlsafe_b64decode(message['payload']['body']['data']).decode('utf-8')
    
    return ""

エージェントの方のプログラム

# Add nest_asyncio to allow running asyncio.run within an existing event loop
import os
import sys
import asyncio
from dotenv import load_dotenv
from google.adk import Agent, Runner
from google.adk.tools import FunctionTool
from google.adk.sessions import InMemorySessionService
from google.genai import types
from shared_libraries.error_handling import (
    before_agent_callback,
    after_agent_callback,
)

load_dotenv('.env')

parent_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(parent_dir)
sys.path.append(parent_dir)
sys.path.append(root_dir)
from shared_libraries import constants
from . import prompt
#import prompt

# gmail_tool.pyからツール関数をインポート
from .gmail_tool import (
    initialize_gmail_client,
    search_emails,
    get_email_content,
    create_draft,
    send_email
)

# コルーチンを実行するヘルパー関数
def run_coroutine(coroutine):
    """コルーチンを実行してその結果を返す"""
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coroutine)

# ====== ADKツールの定義 ======

# 非同期関数をラップする同期関数
def gmail_init():
    """
    Gmailクライアントを初期化するツール
    
    Returns:
        Dict[str, Any]: 初期化結果と状態を含む辞書
    """
    return run_coroutine(initialize_gmail_client())

def gmail_search(query: str, max_results: int = 10):
    """
    Gmailでメールを検索するツール
    
    Args:
        query (str): 検索クエリ (Gmail検索構文に従う)
        max_results (int, optional): 取得する最大結果数、デフォルトは10
    
    Returns:
        Dict[str, Any]: 検索結果と状態を含む辞書
    """
    return run_coroutine(search_emails(query, max_results))

def gmail_read(email_id: str):
    """
    メールの内容を読み取るツール
    
    Args:
        email_id (str): メールID
    
    Returns:
        Dict[str, Any]: メール内容と状態を含む辞書
    """
    return run_coroutine(get_email_content(email_id))

def gmail_draft(to: str, subject: str, body: str, cc: str = "", bcc: str = ""):
    """
    メールのドラフトを作成するツール
    
    Args:
        to (str): 宛先メールアドレス (複数の場合はカンマ区切り)
        subject (str): 件名
        body (str): 本文
        cc (str, optional): CCアドレス (複数の場合はカンマ区切り)
        bcc (str, optional): BCCアドレス (複数の場合はカンマ区切り)
    
    Returns:
        Dict[str, Any]: 作成結果と状態を含む辞書
    """
    return run_coroutine(create_draft(to, subject, body, cc, bcc))

def gmail_send(to: str, subject: str, body: str, cc: str = "", bcc: str = ""):
    """
    メールを送信するツール
    
    Args:
        to (str): 宛先メールアドレス (複数の場合はカンマ区切り)
        subject (str): 件名
        body (str): 本文
        cc (str, optional): CCアドレス (複数の場合はカンマ区切り)
        bcc (str, optional): BCCアドレス (複数の場合はカンマ区切り)
    
    Returns:
        Dict[str, Any]: 送信結果と状態を含む辞書
    """
    return run_coroutine(send_email(to, subject, body, cc, bcc))

# ツールのインスタンス化
gmail_init_tool = FunctionTool(func=gmail_init)
gmail_search_tool = FunctionTool(func=gmail_search)
gmail_read_tool = FunctionTool(func=gmail_read)
gmail_draft_tool = FunctionTool(func=gmail_draft)
gmail_send_tool = FunctionTool(func=gmail_send)

# エージェントの設定
AGENT_MODEL = constants.MODEL_GEMINI_2_5_PRO # Starting with a powerful Gemini model
APP_NAME = "gmail_app"
USER_ID = "1234"
SESSION_ID = "session1234"

try:    
    # Define Agent with the wrapped tool
    gmail_root_agent = Agent(
        model=AGENT_MODEL, # Adjust model name if needed
        name=prompt.AGENT_NAME,
        description=prompt.DESCRIPTION,
        instruction=prompt.PROMPT,
        tools=[
            gmail_init_tool,
            gmail_search_tool,
            gmail_read_tool,
            gmail_draft_tool,
            gmail_send_tool
        ],
        before_agent_callback=before_agent_callback,
        after_agent_callback=after_agent_callback,
    )

    session_service = InMemorySessionService()
    session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
    runner = Runner(
        agent=gmail_root_agent, 
        app_name=APP_NAME, 
        session_service=session_service)

    print("Agent initialization complete. 'root_agent' is now defined.")
  
except Exception as e:
    print(f"Error during agent initialization: {e}")
    # Raise the exception to prevent the module from loading incorrectly
    raise