diff --git a/.gitignore b/.gitignore index b42ff18..392fcbd 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,10 @@ pipeline_ui/backend/uploads/* # Frontend (Node.js) pipeline_ui/frontend/node_modules/* pipeline_ui/frontend/package-lock.json + + +I_origin_0/* +I_origin_1/* +I_origin_2/* + +output/* \ No newline at end of file diff --git a/generate_synthetic_table/flow.py b/generate_synthetic_table/flow.py index ac5b99c..0177147 100644 --- a/generate_synthetic_table/flow.py +++ b/generate_synthetic_table/flow.py @@ -99,23 +99,47 @@ def _call_llm( if return_token_usage: # Extract token usage from response metadata token_usage = 0 - if hasattr(response, 'response_metadata'): - usage_metadata = response.response_metadata.get('usage', {}) - # OpenAI/Gemini format + + logger.info(f"=== TOKEN DEBUG START ===") + logger.info(f"Response type: {type(response)}") + logger.info(f"Has usage_metadata: {hasattr(response, 'usage_metadata')}") + logger.info(f"Has response_metadata: {hasattr(response, 'response_metadata')}") + + # Try response.usage_metadata first (Gemini pool format) + if hasattr(response, 'usage_metadata') and response.usage_metadata: + usage = response.usage_metadata + logger.info(f"usage_metadata type: {type(usage)}") + logger.info(f"usage_metadata value: {usage}") + + if isinstance(usage, dict): + token_usage = usage.get('total_tokens', 0) + if not token_usage: + token_usage = usage.get('input_tokens', 0) + usage.get('output_tokens', 0) + else: + token_usage = getattr(usage, 'total_tokens', 0) + if not token_usage: + token_usage = getattr(usage, 'input_tokens', 0) + getattr(usage, 'output_tokens', 0) + + logger.info(f"Extracted token_usage from usage_metadata: {token_usage}") + + # Fallback: response.response_metadata (OpenAI format) + if not token_usage and hasattr(response, 'response_metadata'): + metadata = response.response_metadata + logger.info(f"response_metadata: {metadata}") + usage_metadata = metadata.get('usage', {}) + logger.info(f"usage from response_metadata: {usage_metadata}") + token_usage = usage_metadata.get('total_tokens', 0) - # Fallback: prompt_tokens + completion_tokens if not token_usage: token_usage = usage_metadata.get('prompt_tokens', 0) + usage_metadata.get('completion_tokens', 0) - # Fallback: input_tokens + output_tokens if not token_usage: token_usage = usage_metadata.get('input_tokens', 0) + usage_metadata.get('output_tokens', 0) - # Alternative: usage_metadata attribute (dict or object) - if not token_usage and hasattr(response, 'usage_metadata') and response.usage_metadata: - usage = response.usage_metadata - if isinstance(usage, dict): - token_usage = usage.get('total_tokens', 0) or (usage.get('input_tokens', 0) + usage.get('output_tokens', 0)) - else: - token_usage = getattr(usage, 'total_tokens', 0) or (getattr(usage, 'input_tokens', 0) + getattr(usage, 'output_tokens', 0)) + + logger.info(f"Extracted token_usage from response_metadata: {token_usage}") + + logger.info(f"Final token_usage: {token_usage}") + logger.info(f"=== TOKEN DEBUG END ===") + return response_content, token_usage return response_content @@ -641,7 +665,11 @@ def _node(state: TableState) -> TableState: errors.append(f"QA prompt missing placeholder: {e}") return {**state, "errors": errors} - response_text = _call_llm(llm, prompt) + response_text, token_usage = _call_llm(llm, prompt, return_token_usage=True) + + # Debug log for token usage + logger.info(f"QA generation token usage: {token_usage}") + response_json = robust_json_parse(response_text) qa_results = [] @@ -650,7 +678,8 @@ def _node(state: TableState) -> TableState: else: logger.warning("QA generation did not return valid JSON or 'qa_pairs' key.") - return {**state, "qa_results": qa_results} + logger.info(f"Returning token_usage: {token_usage}") + return {**state, "qa_results": qa_results, "token_usage": token_usage} return _node @@ -685,6 +714,10 @@ def _node(state: TableState) -> TableState: prompt = prompt_template response_text, token_usage = _call_llm(llm, prompt, image_urls=image_data_urls, return_token_usage=True) + + # Debug log for token usage + logger.info(f"QA generation token usage: {token_usage}") + response_json = robust_json_parse(response_text) qa_results = [] @@ -693,6 +726,7 @@ def _node(state: TableState) -> TableState: else: logger.warning("QA generation from image did not return valid JSON or 'qa_pairs' key.") + logger.info(f"Returning token_usage: {token_usage}") return {**state, "qa_results": qa_results, "token_usage": token_usage} return _node @@ -877,6 +911,8 @@ def run_synthetic_table_flow( temperature: float = 0.2, base_url: str | None = None, config_path: str | None = None, + azure_deployment: str | None = None, + azure_endpoint: str | None = None, qa_only: bool = False, image_paths: List[str] | None = None, domain: str | None = None, @@ -891,11 +927,13 @@ def run_synthetic_table_flow( Args: image_path: Path to the input image or HTML file - provider: LLM provider (openai, gemini, gemini_pool, claude, vllm) + provider: LLM provider (openai, azure, gemini, gemini_pool, claude, vllm) model: Model name temperature: Sampling temperature base_url: Custom base URL for vLLM config_path: Config path for gemini_pool + azure_deployment: Azure OpenAI deployment name + azure_endpoint: Azure OpenAI endpoint URL qa_only: If True, skip synthetic data generation and only generate QA from image image_paths: Optional list of image paths for multi-image processing domain: Optional domain for prompt customization (e.g. 'public') diff --git a/generate_synthetic_table/llm_factory.py b/generate_synthetic_table/llm_factory.py index f5acd00..f0cdac8 100644 --- a/generate_synthetic_table/llm_factory.py +++ b/generate_synthetic_table/llm_factory.py @@ -1,10 +1,12 @@ from __future__ import annotations import os +import yaml +from pathlib import Path from typing import Optional from langchain_core.language_models import BaseChatModel -from langchain_openai import ChatOpenAI +from langchain_openai import ChatOpenAI, AzureChatOpenAI from langchain_google_genai import ChatGoogleGenerativeAI from langchain_anthropic import ChatAnthropic @@ -12,6 +14,29 @@ from polling_gemini import GeminiPoolChatModel, create_gemini_chat_model +def _load_azure_config_from_yaml(config_path: Optional[str] = None) -> dict: + """Load Azure OpenAI configuration from gemini_keys.yaml file.""" + if not config_path: + config_path = "apis/gemini_keys.yaml" + + config_file = Path(config_path) + if not config_file.exists(): + return {} + + try: + with open(config_file, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + + return { + 'api_key': config.get('AZURE_OPENAI_API_KEY'), + 'endpoint': config.get('AZURE_OPENAI_ENDPOINT'), + 'api_version': config.get('AZURE_OPENAI_API_VERSION'), + 'deployment': config.get('AZURE_OPENAI_DEPLOYMENT_NAME'), + } + except Exception: + return {} + + def get_llm( provider: str, model: str, @@ -19,17 +44,21 @@ def get_llm( base_url: Optional[str] = None, api_key: Optional[str] = None, config_path: Optional[str] = None, + azure_deployment: Optional[str] = None, + azure_endpoint: Optional[str] = None, ) -> BaseChatModel: """ Factory to create a Chat Model based on the provider. Args: - provider: 'openai', 'gemini', 'gemini_pool', 'claude', or 'vllm' + provider: 'openai', 'azure', 'gemini', 'gemini_pool', 'claude', or 'vllm' model: Model name (e.g., 'gpt-4', 'gemini-1.5-flash', 'claude-sonnet-4-20250514') temperature: Sampling temperature base_url: Optional base URL for vLLM or custom OpenAI endpoints api_key: Optional API key override config_path: Optional config path for gemini_pool (apis/gemini_keys.yaml) + azure_deployment: Azure OpenAI deployment name (required for azure provider) + azure_endpoint: Azure OpenAI endpoint URL (required for azure provider) Returns: A configured LangChain Chat Model @@ -66,6 +95,30 @@ def get_llm( temperature=temperature, ) + elif provider == "azure": + # Azure OpenAI + # 우선순위: CLI 파라미터 > config_path의 yaml 파일 > 환경변수 + yaml_config = _load_azure_config_from_yaml(config_path) + + azure_key = api_key or yaml_config.get('api_key') or os.getenv("AZURE_OPENAI_API_KEY") + azure_ep = azure_endpoint or yaml_config.get('endpoint') or os.getenv("AZURE_OPENAI_ENDPOINT") + azure_dep = azure_deployment or yaml_config.get('deployment') or model + azure_ver = yaml_config.get('api_version') or os.getenv("AZURE_OPENAI_API_VERSION") or "2024-02-15-preview" + + if not azure_key or not azure_ep: + raise ValueError( + "Azure OpenAI requires AZURE_OPENAI_API_KEY and AZURE_OPENAI_ENDPOINT. " + "Set via environment variables, --azure-* arguments, or in apis/gemini_keys.yaml." + ) + + return AzureChatOpenAI( + azure_deployment=azure_dep, + azure_endpoint=azure_ep, + api_key=azure_key, + api_version=azure_ver, + temperature=temperature, + ) + elif provider == "claude": # Anthropic Claude API # ANTHROPIC_API_KEY 환경변수 또는 api_key 파라미터 사용 diff --git a/generate_synthetic_table/prompts/insurance.yaml b/generate_synthetic_table/prompts/insurance.yaml index fae3ff8..ce0c759 100644 --- a/generate_synthetic_table/prompts/insurance.yaml +++ b/generate_synthetic_table/prompts/insurance.yaml @@ -15,6 +15,7 @@ generate_qa: | 3. **Language:** The questions and answers MUST be in Korean. 4. **Reasoning Language:** reasoning_annotation MUST be written in English and MUST be a single string (not a list). 5. **Accuracy:** Ensure all answers are factually correct based on the data in the table. + 7. **Context:** Include the specific table cells or rows that were used to answer the question. **Output Format (JSON):** {{ @@ -22,7 +23,9 @@ generate_qa: | {{ "question": "...", "answer": "...", - "type": "lookup" + "type": "lookup", + "reasoning_annotation": "Detailed explanation of how the answer was derived (in English, single string)", + "context": "Specific table cells/rows used (e.g., 'Row 2, Column 3: Premium amount')" }}, ... ] @@ -43,6 +46,7 @@ generate_qa_from_image: | 4. **Language:** The questions and answers MUST be in Korean. 5. **Reasoning Language:** reasoning_annotation MUST be written in English and MUST be a single string (not a list). 6. **Accuracy:** Ensure 100% factual correctness. + 7. **Context:** Include specific cell references or table sections used to derive the answer. **Output Format (JSON):** {{ @@ -50,7 +54,9 @@ generate_qa_from_image: | {{ "question": "...", "answer": "...", - "type": "lookup" + "type": "lookup", + "reasoning_annotation": "Step-by-step reasoning process in English (single string)", + "context": "Table location used (e.g., 'Premium column, Row 3')" }}, ... ] diff --git a/generate_synthetic_table/runner.py b/generate_synthetic_table/runner.py index c68412a..1d87faf 100644 --- a/generate_synthetic_table/runner.py +++ b/generate_synthetic_table/runner.py @@ -53,6 +53,15 @@ def _auto_detect_domain(name: str) -> str | None: return None +def _save_upload_log(image_path: str) -> None: + """Save uploaded image path to log file.""" + log_file = Path("tests/choi/upload_fin.txt") + log_file.parent.mkdir(parents=True, exist_ok=True) + + with open(log_file, 'a', encoding='utf-8') as f: + f.write(f"{image_path}\n") + + def build_arg_parser() -> argparse.ArgumentParser: """Create the common argument parser used by CLI entrypoints.""" @@ -67,8 +76,8 @@ def build_arg_parser() -> argparse.ArgumentParser: parser.add_argument( "--provider", default="openai", - choices=["openai", "gemini", "gemini_pool", "claude", "vllm"], - help="LLM provider to use (default: openai). gemini_pool uses API key rotation from apis/gemini_keys.yaml. claude uses ANTHROPIC_API_KEY.", + choices=["openai", "azure", "gemini", "gemini_pool", "claude", "vllm"], + help="LLM provider to use (default: openai). azure uses Azure OpenAI. gemini_pool uses API key rotation from apis/gemini_keys.yaml. claude uses ANTHROPIC_API_KEY.", ) parser.add_argument( "--config-path", @@ -79,6 +88,14 @@ def build_arg_parser() -> argparse.ArgumentParser: "--base-url", help="Custom Base URL for vLLM or OpenAI-compatible endpoints", ) + parser.add_argument( + "--azure-deployment", + help="Azure OpenAI deployment name (required for azure provider)", + ) + parser.add_argument( + "--azure-endpoint", + help="Azure OpenAI endpoint URL (required for azure provider)", + ) parser.add_argument( "--model", default="gpt-4o-mini", @@ -203,6 +220,11 @@ def build_arg_parser() -> argparse.ArgumentParser: type=Path, help="Path to config file with Notion API key and database IDs (default: apis/gemini_keys.yaml)", ) + parser.add_argument( + "--upload-log", + action="store_true", + help="Save uploaded image paths to tests/choi/upload_fin.txt for tracking", + ) return parser @@ -214,6 +236,8 @@ def run_flow_for_image( temperature: float = 0.2, base_url: str | None = None, config_path: str | None = None, + azure_deployment: str | None = None, + azure_endpoint: str | None = None, qa_only: bool = False, domain: str | None = None, # 체크포인팅 옵션 @@ -231,6 +255,8 @@ def run_flow_for_image( temperature: Sampling temperature base_url: Custom base URL config_path: Config path for gemini_pool + azure_deployment: Azure OpenAI deployment name + azure_endpoint: Azure OpenAI endpoint URL qa_only: Generate QA only without synthetic data domain: Domain for prompt customization enable_checkpointing: Enable checkpointing for resumable execution @@ -248,13 +274,14 @@ def run_flow_for_image( if provider == "openai" and not os.getenv("OPENAI_API_KEY"): msg = "OPENAI_API_KEY is not set. Add it to a .env file or your environment." raise RuntimeError(msg) + # azure는 yaml 파일에서도 읽을 수 있으므로 여기서 체크하지 않음 (get_llm에서 체크) if provider == "gemini" and not os.getenv("GOOGLE_API_KEY"): msg = "GOOGLE_API_KEY is not set. Add it to a .env file or your environment." raise RuntimeError(msg) if provider == "claude" and not os.getenv("ANTHROPIC_API_KEY"): msg = "ANTHROPIC_API_KEY is not set. Add it to a .env file or your environment." raise RuntimeError(msg) - # gemini_pool은 apis/gemini_keys.yaml에서 키를 로드하므로 환경변수 체크 불필요 + # gemini_pool과 azure는 apis/gemini_keys.yaml에서 키를 로드하므로 환경변수 체크 불필요 return run_synthetic_table_flow( str(image), @@ -263,6 +290,8 @@ def run_flow_for_image( temperature=temperature, base_url=base_url, config_path=config_path, + azure_deployment=azure_deployment, + azure_endpoint=azure_endpoint, qa_only=qa_only, domain=domain, enable_checkpointing=enable_checkpointing, @@ -401,6 +430,8 @@ def run_with_args(args: argparse.Namespace) -> TableState | Dict | None: temperature=args.temperature, base_url=args.base_url, config_path=str(args.config_path) if args.config_path else None, + azure_deployment=getattr(args, 'azure_deployment', None), + azure_endpoint=getattr(args, 'azure_endpoint', None), qa_only=getattr(args, 'qa_only', False), output_dir=getattr(args, 'output_dir', None), max_workers=getattr(args, 'max_workers', 3), @@ -415,6 +446,7 @@ def run_with_args(args: argparse.Namespace) -> TableState | Dict | None: checkpoint_dir=checkpoint_dir, upload_notion=getattr(args, 'upload_notion', False), notion_config=str(args.notion_config) if getattr(args, 'notion_config', None) else None, + upload_log=getattr(args, 'upload_log', False), ) # Single file processing @@ -438,6 +470,8 @@ def run_with_args(args: argparse.Namespace) -> TableState | Dict | None: temperature=args.temperature, base_url=args.base_url, config_path=str(args.config_path) if args.config_path else None, + azure_deployment=getattr(args, 'azure_deployment', None), + azure_endpoint=getattr(args, 'azure_endpoint', None), qa_only=getattr(args, 'qa_only', False), domain=domain, # 체크포인팅 옵션 @@ -498,6 +532,11 @@ def run_with_args(args: argparse.Namespace) -> TableState | Dict | None: provider=args.provider, ) print(f"\n✅ Notion upload complete: {upload_summary['success']}/{upload_summary['total']} succeeded") + + # Save uploaded image path to log file if requested + if getattr(args, 'upload_log', False) and upload_summary['success'] > 0: + _save_upload_log(str(input_path)) + except ImportError as e: print(f"\n❌ Notion upload failed: {e}") print(" Install with: pip install notion-client") @@ -517,6 +556,8 @@ def run_batch_for_folder( temperature: float = 0.2, base_url: str | None = None, config_path: str | None = None, + azure_deployment: str | None = None, + azure_endpoint: str | None = None, qa_only: bool = False, output_dir: Path | None = None, max_workers: int = 3, @@ -531,6 +572,7 @@ def run_batch_for_folder( checkpoint_dir: str | None = None, upload_notion: bool = False, notion_config: str | None = None, + upload_log: bool = False, ) -> Dict[str, any]: """ Execute the flow for all images in a folder (batch processing). @@ -542,6 +584,8 @@ def run_batch_for_folder( temperature: Sampling temperature base_url: Custom base URL config_path: Config path for gemini_pool + azure_deployment: Azure OpenAI deployment name + azure_endpoint: Azure OpenAI endpoint URL qa_only: Generate QA only without synthetic data output_dir: Output directory for results max_workers: Number of parallel workers @@ -664,6 +708,8 @@ def process_task(task: Dict) -> Dict: temperature=temperature, base_url=base_url, config_path=config_path, + azure_deployment=azure_deployment, + azure_endpoint=azure_endpoint, qa_only=qa_only, domain=domain, # 체크포인팅 옵션 @@ -696,6 +742,7 @@ def process_task(task: Dict) -> Dict: "status": "success" if not result.get("errors") else "partial", "qa_count": len(result.get("qa_results", [])), "output_file": str(output_file), + "token_usage": result.get("token_usage", 0), "errors": result.get("errors", []), } @@ -707,6 +754,28 @@ def process_task(task: Dict) -> Dict: "error": str(e), } + # Initialize Notion uploader if needed + notion_uploader = None + notion_upload_success = 0 + notion_upload_failed = 0 + + if upload_notion: + if not domain: + print("⚠️ --upload-notion requires --domain to be specified. Notion upload disabled.") + upload_notion = False + else: + try: + from .notion_uploader import NotionUploader + notion_uploader = NotionUploader(config_path=notion_config) + print(f"📤 Notion upload enabled (domain: {domain})") + except ImportError as e: + print(f"⚠️ Notion upload disabled: {e}") + print(" Install with: pip install notion-client") + upload_notion = False + except Exception as e: + print(f"⚠️ Notion upload disabled: {e}") + upload_notion = False + # Parallel processing with ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_task = { @@ -723,9 +792,70 @@ def process_task(task: Dict) -> Dict: if result["status"] == "success": success_count += 1 print(f"✅ {result['name']} - {result['qa_count']} QA pairs") + + # Upload to Notion immediately after QA generation + if upload_notion and notion_uploader and result.get("output_file"): + try: + output_file = Path(result["output_file"]) + if output_file.exists(): + data = json.loads(output_file.read_text(encoding="utf-8")) + qa_results = data.get("qa_results", []) + if qa_results: + # Use token_usage from result (already available) + token_usage = result.get("token_usage", 0) + upload_result = notion_uploader.upload_qa_result( + domain=domain, + image_path=data.get("name", result["name"]), + qa_results=qa_results, + table_summary=data.get("table_summary"), + token_usage=token_usage, + provider=provider, + ) + notion_upload_success += 1 + print(f" 📤 Uploaded to Notion: {upload_result.get('created_count', 0)} rows (tokens: {token_usage})") + + # Save to upload log if requested + if upload_log: + image_paths = data.get("image_paths", []) + for img_path in image_paths: + _save_upload_log(img_path) + except Exception as e: + notion_upload_failed += 1 + print(f" ⚠️ Notion upload failed: {e}") + elif result["status"] == "partial": success_count += 1 print(f"⚠️ {result['name']} - {result['qa_count']} QA pairs (with errors)") + + # Upload partial results to Notion + if upload_notion and notion_uploader and result.get("output_file"): + try: + output_file = Path(result["output_file"]) + if output_file.exists(): + data = json.loads(output_file.read_text(encoding="utf-8")) + qa_results = data.get("qa_results", []) + if qa_results: + # Use token_usage from result + token_usage = result.get("token_usage", 0) + upload_result = notion_uploader.upload_qa_result( + domain=domain, + image_path=data.get("name", result["name"]), + qa_results=qa_results, + table_summary=data.get("table_summary"), + token_usage=token_usage, + provider=provider, + ) + notion_upload_success += 1 + print(f" 📤 Uploaded to Notion: {upload_result.get('created_count', 0)} rows (tokens: {token_usage})") + + # Save to upload log if requested + if upload_log: + image_paths = data.get("image_paths", []) + for img_path in image_paths: + _save_upload_log(img_path) + except Exception as e: + notion_upload_failed += 1 + print(f" ⚠️ Notion upload failed: {e}") else: failed_count += 1 print(f"❌ {result['name']} - {result.get('error', 'Unknown error')}") @@ -752,6 +882,15 @@ def process_task(task: Dict) -> Dict: "checkpoint_dir": checkpoint_dir, "results": results, } + + # Add Notion upload summary if enabled + if upload_notion and notion_uploader: + summary["notion_upload"] = { + "enabled": True, + "success": notion_upload_success, + "failed": notion_upload_failed, + "total": notion_upload_success + notion_upload_failed, + } summary_file = output_dir / "_summary.json" summary_file.write_text( @@ -763,53 +902,10 @@ def process_task(task: Dict) -> Dict: print(f"{'='*50}") print(f"Batch processing complete!") print(f"Total: {len(image_files)}, Success: {success_count}, Failed: {failed_count}") + if upload_notion and notion_uploader: + print(f"Notion Upload: {notion_upload_success} succeeded, {notion_upload_failed} failed") print(f"Summary saved to: {summary_file}") - # Upload to Notion if requested - if upload_notion: - if not domain: - print("\n⚠️ --upload-notion requires --domain to be specified. Skipping upload.") - else: - print(f"\n{'='*50}") - print(f"Uploading to Notion (domain: {domain})...") - - # Prepare results for Notion upload - # Load the actual QA results from saved files - notion_results = [] - for result in results: - if result.get("status") in ("success", "partial"): - output_file = result.get("output_file") - if output_file and Path(output_file).exists(): - try: - data = json.loads(Path(output_file).read_text(encoding="utf-8")) - notion_results.append({ - "image_path": data.get("name", result.get("name")), - "qa_results": data.get("qa_results", []), - "table_summary": data.get("table_summary"), - "token_usage": data.get("token_usage", 0), - }) - except Exception as e: - print(f"⚠️ Failed to load {output_file}: {e}") - - if notion_results: - try: - upload_summary = upload_to_notion( - domain=domain, - results=notion_results, - config_path=notion_config, - verbose=True, - provider=provider, - ) - summary["notion_upload"] = upload_summary - print(f"\n✅ Notion upload complete: {upload_summary['success']}/{upload_summary['total']} succeeded") - except ImportError as e: - print(f"\n❌ Notion upload failed: {e}") - print(" Install with: pip install notion-client") - except Exception as e: - print(f"\n❌ Notion upload failed: {e}") - else: - print("⚠️ No successful results to upload.") - return summary diff --git a/tests/choi/upload_fin.txt b/tests/choi/upload_fin.txt new file mode 100644 index 0000000..da757cb --- /dev/null +++ b/tests/choi/upload_fin.txt @@ -0,0 +1 @@ +I_origin_0/I_table_1.png