diff --git a/spp_dci/schemas/constants.py b/spp_dci/schemas/constants.py index 912971ba..3d4a49c8 100644 --- a/spp_dci/schemas/constants.py +++ b/spp_dci/schemas/constants.py @@ -4,13 +4,17 @@ class RegistryType(StrEnum): - """DCI Registry types.""" + """DCI Registry types (SPDCI spec compliant). - SOCIAL_REGISTRY = "SOCIAL_REGISTRY" - IBR = "IBR" - CRVS = "CRVS" - DISABILITY_REGISTRY = "DR" - FUNCTIONAL_REGISTRY = "FR" + Values use the namespaced format as specified in SPDCI API Standards. + Reference: src/registry/*/RegistryType.yaml + """ + + SOCIAL_REGISTRY = "ns:org:RegistryType:Social" + CRVS = "ns:org:RegistryType:Civil" + IBR = "ns:org:RegistryType:IBR" + DISABILITY_REGISTRY = "ns:org:RegistryType:DR" + FUNCTIONAL_REGISTRY = "ns:org:RegistryType:FR" class RegistryEventType(StrEnum): diff --git a/spp_dci_client/services/client.py b/spp_dci_client/services/client.py index cefbe1c2..526905d1 100644 --- a/spp_dci_client/services/client.py +++ b/spp_dci_client/services/client.py @@ -169,6 +169,7 @@ def search_by_id( page: int = 1, page_size: int = 10, async_mode: bool = False, + registry_event_type: str | None = None, ) -> dict: """Search by identifier type and value (convenience method). @@ -179,6 +180,7 @@ def search_by_id( page: Page number (1-indexed) page_size: Records per page async_mode: If True, use async endpoint + registry_event_type: Event type filter (BIRTH, DEATH, etc.) Returns: SearchResponse or ACK dict @@ -191,8 +193,56 @@ def search_by_id( record_type=record_type, page=page, page_size=page_size, + registry_event_type=registry_event_type, ) + def search_by_id_opencrvs( + self, + identifier_type: str, + identifier_value: str, + event_type: str = "birth", + page: int = 1, + page_size: int = 10, + async_mode: bool = False, + ) -> dict: + """Search by identifier using OpenCRVS's non-standard format. + + OpenCRVS doesn't support the standard DCI idtype-value query format. + Instead, it requires an expression query with the identifier nested. + + Args: + identifier_type: Identifier type (UIN, BRN, MRN, DRN, etc.) + identifier_value: Identifier value + event_type: Registry event type (birth, death, etc.) - lowercase + page: Page number (1-indexed) + page_size: Records per page + async_mode: If True, use async endpoint + + Returns: + SearchResponse or ACK dict + """ + # Build OpenCRVS-style query for ID lookup + # Format: { type: "BRN"|"UIN"|"DRN", value: "" } + query = { + "type": identifier_type, + "value": identifier_value, + } + + # Build envelope in OpenCRVS format + envelope = self._build_search_envelope_opencrvs( + query=query, + query_type="idtype-value", + event_type=event_type, + page=page, + page_size=page_size, + ) + + if async_mode: + return self._make_request(ENDPOINT_ASYNC_SEARCH, envelope) + else: + endpoint = self.data_source.search_endpoint or ENDPOINT_SYNC_SEARCH + return self._make_request(endpoint, envelope) + def search_by_predicate( self, predicate: str, @@ -231,49 +281,57 @@ def search_by_expression( registry_type: str | None = None, registry_event_type: str | None = None, async_mode: bool = False, + use_opencrvs_format: bool = False, ) -> dict: """Search using expression query (e.g., date range filters). - This supports DCI-compliant expression queries using ExpPredicate format. + This supports both DCI-compliant expression queries and OpenCRVS's + non-standard format. Args: - expression: DCI ExpPredicateWithCondition list, e.g.: - [ - { - "seq_num": 1, - "expression1": { - "attribute_name": "dateOfEvent", - "operator": "ge", - "attribute_value": "2020-01-01" - }, - "condition": "and", - "expression2": { - "attribute_name": "dateOfEvent", - "operator": "le", - "attribute_value": "2026-02-10" - } - } - ] + expression: Query expression. For standard DCI, a list of + ExpPredicateWithCondition dicts. For OpenCRVS format, a dict + of attribute filters (e.g., {"birthDate": {"type": "range", ...}}). record_type: PERSON, GROUP, etc. page: Page number (1-indexed) page_size: Records per page registry_type: Registry type (defaults to data source registry type) registry_event_type: Optional event type filter (e.g., "BIRTH", "DEATH") async_mode: If True, use async endpoint + use_opencrvs_format: If True, wrap expression in OpenCRVS query + structure and use OpenCRVS envelope format. Returns: SearchResponse or ACK dict """ - # Build envelope directly with expression query (bypass _parse_query) - envelope = self._build_search_envelope( - query_type=QueryType.EXPRESSION, - query=expression, - registry_type=registry_type or self._get_registry_type(), - registry_event_type=registry_event_type, - record_type=record_type, - page=page, - page_size=page_size, - ) + if use_opencrvs_format: + # Wrap the caller's expression in OpenCRVS query structure + opencrvs_query = { + "type": "ns:org:QueryType:expression", + "value": { + "expression": { + "query": expression, + } + }, + } + envelope = self._build_search_envelope_opencrvs( + query=opencrvs_query, + query_type="expression", + event_type=registry_event_type, + page=page, + page_size=page_size, + ) + else: + # Standard DCI path (unchanged) + envelope = self._build_search_envelope( + query_type=QueryType.EXPRESSION, + query=expression, + registry_type=registry_type or self._get_registry_type(), + registry_event_type=registry_event_type, + record_type=record_type, + page=page, + page_size=page_size, + ) if async_mode: return self._make_request(ENDPOINT_ASYNC_SEARCH, envelope) @@ -379,14 +437,10 @@ def _search_by_date_range_opencrvs( ) -> dict: """Search by date range using OpenCRVS's non-standard envelope format. - OpenCRVS uses a custom format that differs from DCI spec: - - reg_type: lowercase event type (e.g., "birth") instead of registry type - - No reg_event_type field - - Requires consent object and locale field - - Expression query format: { type: "ns:org:QueryType:expression", value: { : { type: "range", ... } } } + Builds an expression query with the correct OpenCRVS nesting: + { type, value: { expression: { query: { : { type: "range", ... } } } } } - This method exists for interoperability with OpenCRVS servers that don't - follow the standard DCI envelope format. + Then delegates to _build_search_envelope_opencrvs for the envelope wrapper. Args: start_date: Start date in ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ) @@ -401,11 +455,26 @@ def _search_by_date_range_opencrvs( Returns: SearchResponse or ACK dict """ - # Build envelope directly in OpenCRVS format (bypassing standard _build_search_envelope) + # Build OpenCRVS-style expression query with correct nesting + expression_query = { + "type": "ns:org:QueryType:expression", + "value": { + "expression": { + "query": { + attribute_name: { + "type": "range", + "gte": start_date, + "lte": end_date, + } + } + } + }, + } + + # Build envelope in OpenCRVS format (bypassing standard _build_search_envelope) envelope = self._build_search_envelope_opencrvs( - start_date=start_date, - end_date=end_date, - attribute_name=attribute_name, + query=expression_query, + query_type="expression", event_type=event_type, page=page, page_size=page_size, @@ -419,9 +488,8 @@ def _search_by_date_range_opencrvs( def _build_search_envelope_opencrvs( self, - start_date: str, - end_date: str, - attribute_name: str, + query: dict, + query_type: str, event_type: str | None, page: int, page_size: int, @@ -429,49 +497,34 @@ def _build_search_envelope_opencrvs( """Build search envelope in OpenCRVS's non-standard format. OpenCRVS expects: - - reg_type: lowercase event type (e.g., "birth"), NOT registry type - - No reg_event_type field + - reg_type: "ns:org:RegistryType:Civil" + - reg_event_type: lowercase event type (e.g., "birth") - Required consent object - Required locale field - - Expression query with range syntax + - Pre-built query object (e.g., expression with nested query structure) Args: - start_date: Start date - end_date: End date - attribute_name: Attribute to filter on + query: Pre-built query object (e.g., expression query dict) + query_type: Query type string (e.g., "expression", "idtype-value") event_type: Event type (BIRTH, DEATH, etc.) page: Page number page_size: Page size Returns: - Envelope dict in OpenCRVS format + Envelope dict in OpenCRVS format (no signature wrapper) """ now = datetime.now(UTC) transaction_id = str(uuid.uuid4()) reference_id = str(uuid.uuid4()) message_id = str(uuid.uuid4()) - # OpenCRVS uses lowercase event type for reg_type (e.g., "birth" not "BIRTH" or "CRVS") - reg_type = (event_type or "birth").lower() - - # Build OpenCRVS-style expression query - expression_query = { - "type": "ns:org:QueryType:expression", - "value": { - attribute_name: { - "type": "range", - "gte": start_date, - "lte": end_date, - } - }, - } - - # Build search criteria in OpenCRVS format (no reg_event_type) + # Build search criteria in OpenCRVS format search_criteria = { "version": "1.0.0", - "reg_type": reg_type, - "query_type": "expression", - "query": expression_query, + "reg_type": RegistryType.CRVS.value, + "reg_event_type": (event_type or "birth").lower(), + "query_type": query_type, + "query": query, "sort": [ { "attribute_name": "createdAt", diff --git a/spp_dci_client/tests/test_client_service.py b/spp_dci_client/tests/test_client_service.py index cebed1a7..96ccfb0e 100644 --- a/spp_dci_client/tests/test_client_service.py +++ b/spp_dci_client/tests/test_client_service.py @@ -24,7 +24,7 @@ def _create_test_data_source(self, **kwargs): "auth_type": "none", "our_sender_id": "openspp.example.org", "our_callback_uri": "https://openspp.example.org/callback", - "registry_type": "CRVS", + "registry_type": "ns:org:RegistryType:Civil", } vals.update(kwargs) return self.DataSource.create(vals) @@ -543,12 +543,12 @@ def test_get_registry_type_from_data_source(self): """Test registry type is retrieved from data source""" from ..services.client import DCIClient - ds = self._create_test_data_source(registry_type="CRVS") + ds = self._create_test_data_source(registry_type="ns:org:RegistryType:Civil") client = DCIClient(ds, self.env) registry_type = client._get_registry_type() - self.assertEqual(registry_type, "CRVS") + self.assertEqual(registry_type, "ns:org:RegistryType:Civil") def test_get_registry_type_default(self): """Test default registry type when not configured""" @@ -560,7 +560,7 @@ def test_get_registry_type_default(self): registry_type = client._get_registry_type() - self.assertEqual(registry_type, "SOCIAL_REGISTRY") + self.assertEqual(registry_type, "ns:org:RegistryType:Social") def test_search_uses_custom_endpoint(self): """Test search uses custom endpoint if configured""" @@ -591,7 +591,7 @@ def test_build_search_envelope_structure(self): envelope = client._build_search_envelope( query_type=QueryType.IDTYPE_VALUE, query={"type": "UIN", "value": "12345678"}, - registry_type="CRVS", + registry_type="ns:org:RegistryType:Civil", registry_event_type="BIRTH", record_type="PERSON", page=1, @@ -617,7 +617,7 @@ def test_build_search_envelope_structure(self): # Verify search criteria search_criteria = search_request["search_criteria"] self.assertEqual(search_criteria["version"], "1.0.0") - self.assertEqual(search_criteria["reg_type"], "CRVS") + self.assertEqual(search_criteria["reg_type"], "ns:org:RegistryType:Civil") self.assertEqual(search_criteria["reg_event_type"], "BIRTH") self.assertEqual(search_criteria["query_type"], QueryType.IDTYPE_VALUE) self.assertEqual(search_criteria["query"]["type"], "UIN") @@ -858,3 +858,194 @@ def test_envelope_meta_field(self): # meta is required by DCI spec (even if empty) self.assertIn("meta", envelope["header"]) self.assertEqual(envelope["header"]["meta"], {}) + + +class TestOpenCRVSEnvelopeFormat(TransactionCase): + """Test OpenCRVS envelope format for _build_search_envelope_opencrvs.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.DataSource = cls.env["spp.dci.data.source"] + + def _create_test_data_source(self, **kwargs): + """Helper to create a test data source""" + vals = { + "name": "Test CRVS", + "code": "test_crvs", + "base_url": "https://crvs.example.org/api", + "auth_type": "none", + "our_sender_id": "openspp.example.org", + "our_callback_uri": "https://openspp.example.org/callback", + "registry_type": "ns:org:RegistryType:Civil", + } + vals.update(kwargs) + return self.DataSource.create(vals) + + def _build_opencrvs_envelope(self, client, event_type=None): + """Helper to build an OpenCRVS envelope via search_by_date_range.""" + with patch.object(client, "_make_request") as mock_request: + mock_request.return_value = {"status": "success"} + client.search_by_date_range( + start_date="2020-01-01", + end_date="2026-02-10", + attribute_name="dateOfEvent", + event_type=event_type, + use_opencrvs_format=True, + ) + return mock_request.call_args[0][1] + + def test_opencrvs_envelope_reg_type(self): + """reg_type must be 'ns:org:RegistryType:Civil' (not lowercase event type).""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + envelope = self._build_opencrvs_envelope(client) + search_criteria = envelope["message"]["search_request"][0]["search_criteria"] + + self.assertEqual(search_criteria["reg_type"], "ns:org:RegistryType:Civil") + + def test_opencrvs_envelope_reg_event_type(self): + """reg_event_type must be present and default to 'birth' (lowercase).""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + envelope = self._build_opencrvs_envelope(client) + search_criteria = envelope["message"]["search_request"][0]["search_criteria"] + + self.assertIn("reg_event_type", search_criteria) + self.assertEqual(search_criteria["reg_event_type"], "birth") + + def test_opencrvs_envelope_reg_event_type_death(self): + """Passing event_type='DEATH' produces reg_event_type: 'death'.""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + envelope = self._build_opencrvs_envelope(client, event_type="DEATH") + search_criteria = envelope["message"]["search_request"][0]["search_criteria"] + + self.assertEqual(search_criteria["reg_event_type"], "death") + + def test_opencrvs_envelope_expression_query_structure(self): + """Expression query must be nested: type/value/expression/query.""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + envelope = self._build_opencrvs_envelope(client) + search_criteria = envelope["message"]["search_request"][0]["search_criteria"] + query = search_criteria["query"] + + # Top-level structure + self.assertEqual(query["type"], "ns:org:QueryType:expression") + self.assertIn("value", query) + + # value -> expression -> query -> { attribute_name: { type, gte, lte } } + value = query["value"] + self.assertIn("expression", value) + self.assertIn("query", value["expression"]) + + attribute_query = value["expression"]["query"] + self.assertIn("dateOfEvent", attribute_query) + self.assertEqual(attribute_query["dateOfEvent"]["type"], "range") + self.assertEqual(attribute_query["dateOfEvent"]["gte"], "2020-01-01") + self.assertEqual(attribute_query["dateOfEvent"]["lte"], "2026-02-10") + + def test_opencrvs_envelope_has_consent_and_locale(self): + """OpenCRVS envelope must include consent and locale.""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + envelope = self._build_opencrvs_envelope(client) + search_request_item = envelope["message"]["search_request"][0] + + self.assertIn("consent", search_request_item) + self.assertIn("locale", search_request_item) + self.assertEqual(search_request_item["locale"], "eng") + + def test_opencrvs_envelope_no_signature_key(self): + """OpenCRVS envelope must not have a 'signature' key.""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + envelope = self._build_opencrvs_envelope(client) + + self.assertNotIn("signature", envelope) + + def test_search_by_expression_opencrvs_format(self): + """search_by_expression(use_opencrvs_format=True) produces OpenCRVS envelope.""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + expression = {"birthDate": {"type": "range", "gte": "2020-01-01", "lte": "2026-12-31"}} + + with patch.object(client, "_make_request") as mock_request: + mock_request.return_value = {"status": "success"} + client.search_by_expression( + expression=expression, + use_opencrvs_format=True, + ) + envelope = mock_request.call_args[0][1] + + # Should use OpenCRVS format (no signature, has consent/locale) + self.assertNotIn("signature", envelope) + search_request_item = envelope["message"]["search_request"][0] + self.assertIn("consent", search_request_item) + self.assertIn("locale", search_request_item) + + # Query should be wrapped in expression/query structure + search_criteria = search_request_item["search_criteria"] + self.assertEqual(search_criteria["reg_type"], "ns:org:RegistryType:Civil") + query = search_criteria["query"] + self.assertEqual(query["type"], "ns:org:QueryType:expression") + self.assertIn("expression", query["value"]) + self.assertIn("query", query["value"]["expression"]) + self.assertEqual(query["value"]["expression"]["query"], expression) + + def test_search_by_expression_standard_unchanged(self): + """search_by_expression() without flag still uses standard DCI path.""" + from ..services.client import DCIClient + + ds = self._create_test_data_source() + client = DCIClient(ds, self.env) + + expression = [ + { + "seq_num": 1, + "expression1": { + "attribute_name": "dateOfEvent", + "operator": "ge", + "attribute_value": "2020-01-01", + }, + "condition": "and", + "expression2": { + "attribute_name": "dateOfEvent", + "operator": "le", + "attribute_value": "2026-02-10", + }, + } + ] + + with patch.object(client, "_make_request") as mock_request: + mock_request.return_value = {"status": "success"} + client.search_by_expression(expression=expression) + envelope = mock_request.call_args[0][1] + + # Standard DCI format has signature + self.assertIn("signature", envelope) + # Standard DCI format uses registry type not OpenCRVS format + search_criteria = envelope["message"]["search_request"][0]["search_criteria"] + self.assertEqual(search_criteria["reg_type"], "ns:org:RegistryType:Civil") diff --git a/spp_dci_demo/__init__.py b/spp_dci_demo/__init__.py new file mode 100644 index 00000000..bd701f00 --- /dev/null +++ b/spp_dci_demo/__init__.py @@ -0,0 +1,5 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +from . import models +from . import utils +from .hooks import post_init_hook diff --git a/spp_dci_demo/__manifest__.py b/spp_dci_demo/__manifest__.py new file mode 100644 index 00000000..3c4e0d6e --- /dev/null +++ b/spp_dci_demo/__manifest__.py @@ -0,0 +1,29 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +{ + "name": "OpenSPP DCI Demo", + "version": "19.0.1.0.0", + "category": "OpenSPP", + "license": "LGPL-3", + "website": "https://github.com/OpenSPP/OpenSPP2", + "author": "OpenSPP.org", + "depends": [ + "spp_mis_demo_v2", + "spp_dci_client", + "spp_change_request_v2", + "spp_programs", + ], + "data": [ + "security/ir.model.access.csv", + "data/vocabulary_data.xml", + "data/system_parameters.xml", + "data/demo_household.xml", + "views/cr_detail_add_member_view.xml", + "views/change_request_view.xml", + ], + "demo": [], + "post_init_hook": "post_init_hook", + "installable": True, + "application": False, + "auto_install": False, + "summary": "DCI Demo: Birth Verification for Child Benefit Enrollment", +} diff --git a/spp_dci_demo/data/demo_household.xml b/spp_dci_demo/data/demo_household.xml new file mode 100644 index 00000000..3d39e6ab --- /dev/null +++ b/spp_dci_demo/data/demo_household.xml @@ -0,0 +1,48 @@ + + + + + + + MASTERS, Adam + Adam + Masters + 1995-08-14 + + + + + + + + MASTERS, Mary + Mary + Masters + 2002-02-01 + + + + + + + + Masters Household + + + + + + + + + + + + + + + + diff --git a/spp_dci_demo/data/system_parameters.xml b/spp_dci_demo/data/system_parameters.xml new file mode 100644 index 00000000..7be9d41a --- /dev/null +++ b/spp_dci_demo/data/system_parameters.xml @@ -0,0 +1,33 @@ + + + + + + + spp_dci_demo.auto_approve_on_match + True + + + + + spp_dci_demo.default_crvs_data_source + + + + + diff --git a/spp_dci_demo/data/vocabulary_data.xml b/spp_dci_demo/data/vocabulary_data.xml new file mode 100644 index 00000000..61919586 --- /dev/null +++ b/spp_dci_demo/data/vocabulary_data.xml @@ -0,0 +1,15 @@ + + + + + + brn + Birth Registration Number (BRN) + individual + urn:dci:id:brn + Birth Registration Number from a civil registration system (e.g., OpenCRVS) + 10 + + diff --git a/spp_dci_demo/hooks.py b/spp_dci_demo/hooks.py new file mode 100644 index 00000000..b03ebbeb --- /dev/null +++ b/spp_dci_demo/hooks.py @@ -0,0 +1,41 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +import logging + +_logger = logging.getLogger(__name__) + + +def post_init_hook(env): + """Post-installation hook to configure enrollment program. + + Finds the first available program and sets it as the default + enrollment program for the DCI demo. + """ + _logger.info("Running spp_dci_demo post_init_hook") + + # Check if spp.program model exists + if "spp.program" not in env: + _logger.info("spp.program model not available, skipping enrollment program setup") + return + + # Find the Conditional Child Grant program (the target for DCI demo enrollment) + program = env["spp.program"].search([("name", "=", "Conditional Child Grant")], limit=1) + if not program: + # Fall back to any program if not found + program = env["spp.program"].search([], limit=1) + if not program: + _logger.info("No programs found, enrollment_program_id not configured") + return + + # Set the system parameter + # sudo() is intentional: system parameters require admin access + config_params = env["ir.config_parameter"].sudo() # nosemgrep: odoo-sudo-without-context + config_params.set_param( + "spp_dci_demo.enrollment_program_id", + str(program.id), + ) + _logger.info( + "Set spp_dci_demo.enrollment_program_id to %s (%s)", + program.id, + program.name, + ) diff --git a/spp_dci_demo/models/__init__.py b/spp_dci_demo/models/__init__.py new file mode 100644 index 00000000..dd0463d7 --- /dev/null +++ b/spp_dci_demo/models/__init__.py @@ -0,0 +1,5 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +from . import cr_detail_add_member +from . import cr_apply_add_member +from . import change_request diff --git a/spp_dci_demo/models/change_request.py b/spp_dci_demo/models/change_request.py new file mode 100644 index 00000000..91605cbf --- /dev/null +++ b/spp_dci_demo/models/change_request.py @@ -0,0 +1,138 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +"""Extend spp.change.request with computed DCI verification fields. + +These fields pull DCI birth verification data from the detail record +and make it visible on the main CR form, so reviewers can see +verification status without navigating into the detail sub-form. +""" + +import logging + +from markupsafe import Markup, escape + +from odoo import api, fields, models + +_logger = logging.getLogger(__name__) + + +class SPPChangeRequestDCI(models.Model): + """Extend CR with computed DCI verification fields for reviewer UX.""" + + _inherit = "spp.change.request" + + dci_verification_status = fields.Selection( + [ + ("unverified", "Unverified"), + ("verified", "Verified"), + ("not_found", "Not Found"), + ("error", "Error"), + ], + compute="_compute_dci_verification", + string="DCI Verification Status", + ) + + dci_verification_html = fields.Html( + compute="_compute_dci_verification", + string="DCI Verification Summary", + sanitize=False, + ) + + dci_data_match = fields.Boolean( + compute="_compute_dci_verification", + string="DCI Data Matches", + ) + + @api.depends("detail_res_model", "detail_res_id", "approval_state") + def _compute_dci_verification(self): + for rec in self: + rec.dci_verification_status = False + rec.dci_verification_html = "" + rec.dci_data_match = False + + # Only applicable for add_member detail type + if rec.detail_res_model != "spp.cr.detail.add_member": + continue + + detail = rec.get_detail() + if not detail: + continue + + # Check if the detail has DCI fields (from spp_dci_demo) + if not hasattr(detail, "birth_verification_status"): + continue + + status = detail.birth_verification_status + if not status or status == "unverified": + rec.dci_verification_status = status or "unverified" + continue + + rec.dci_verification_status = status + rec.dci_data_match = detail.dci_data_match + + # Build HTML summary + badge_class = { + "verified": "bg-success", + "not_found": "bg-warning", + "error": "bg-danger", + }.get(status, "bg-secondary") + + status_label = { + "verified": "Verified", + "not_found": "Not Found", + "error": "Error", + }.get(status, status) + + parts = [] + + # Status badge + parts.append( + Markup( + '
' + 'Birth Verification:' + '{}' + "
" + ).format(badge_class, escape(status_label)) + ) + + # BRN + if detail.birth_registration_number: + parts.append( + Markup('
BRN: {}
').format( + escape(detail.birth_registration_number) + ) + ) + + # Data match indicator + if status == "verified": + if detail.dci_data_match: + parts.append( + Markup( + '
' + 'Data Match: ' + '' + 'All fields match' + "
" + ) + ) + else: + parts.append( + Markup( + '
' + 'Data Match: ' + '' + '' + "Mismatch detected" + "
" + ) + ) + + # Verification date + if detail.birth_verification_date: + parts.append( + Markup('
Verified: {}
').format( + escape(str(detail.birth_verification_date)) + ) + ) + + rec.dci_verification_html = Markup("").join(parts) diff --git a/spp_dci_demo/models/cr_apply_add_member.py b/spp_dci_demo/models/cr_apply_add_member.py new file mode 100644 index 00000000..fd4f1333 --- /dev/null +++ b/spp_dci_demo/models/cr_apply_add_member.py @@ -0,0 +1,224 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +import logging + +from odoo import models + +_logger = logging.getLogger(__name__) + + +class SPPCRApplyAddMemberDCI(models.AbstractModel): + """Extend Add Member CR apply to create verified BRN registry ID.""" + + _inherit = "spp.cr.apply.add_member" + + def apply(self, change_request): + """Apply change request and create BRN registry ID if birth was verified. + + Extends the base apply() to: + 1. Create the individual and group membership (via super) + 2. If birth was verified, create a verified BRN registry ID on the individual + 3. Auto-enroll the new individual in the household's programs + """ + # Call parent apply - creates individual and membership + result = super().apply(change_request) + + # Get the detail record + detail = change_request.get_detail() + if not detail: + return result + + # Check if birth was verified and BRN is present + if ( + detail.birth_verification_status == "verified" + and detail.birth_registration_number + and detail.created_individual_id + ): + self._create_verified_brn_registry_id(detail) + + # Auto-enroll in household's programs if enabled + if detail.created_individual_id and change_request.registrant_id: + self._auto_enroll_in_household_programs( + detail.created_individual_id, + change_request.registrant_id, + ) + + return result + + def _create_verified_brn_registry_id(self, detail): + """Create a verified BRN registry ID on the created individual. + + Args: + detail: The CR detail record containing verification data + """ + # Get the BRN ID type vocabulary code + brn_id_type = self.env.ref( + "spp_dci_demo.code_id_type_brn", + raise_if_not_found=False, + ) + + if not brn_id_type: + _logger.warning( + "BRN ID type vocabulary code not found (spp_dci_demo.code_id_type_brn). " + "Cannot create verified registry ID." + ) + return + + # Check if the individual already has a BRN + existing_brn = self.env["spp.registry.id"].search( + [ + ("partner_id", "=", detail.created_individual_id.id), + ("id_type_id", "=", brn_id_type.id), + ], + limit=1, + ) + + if existing_brn: + _logger.info( + "Individual %s already has a BRN registry ID, updating with verification data", + detail.created_individual_id.id, + ) + # Update existing record with verification data + existing_brn.write( + { + "value": detail.birth_registration_number, + "status": "valid", + "verification_method": "dci_api", + "verification_date": detail.birth_verification_date, + "verification_source": self._get_verification_source(detail), + "verification_response": detail.birth_verification_response, + } + ) + else: + # Create new registry ID with verification data + registry_id_vals = { + "partner_id": detail.created_individual_id.id, + "id_type_id": brn_id_type.id, + "value": detail.birth_registration_number, + "status": "valid", + "verification_method": "dci_api", + "verification_date": detail.birth_verification_date, + "verification_source": self._get_verification_source(detail), + "verification_response": detail.birth_verification_response, + } + + self.env["spp.registry.id"].create(registry_id_vals) + + _logger.info( + "Created verified BRN registry ID for individual %s (BRN: %s)", + detail.created_individual_id.id, + detail.birth_registration_number, + ) + + def _get_verification_source(self, detail): + """Get the verification source name from the data source. + + Args: + detail: The CR detail record + + Returns: + String identifying the verification source + """ + if detail.dci_data_source_id: + return detail.dci_data_source_id.name + # Try to get from default + default_source = detail._get_default_dci_data_source() + if default_source: + return default_source.name + return "DCI API" + + def _auto_enroll_in_household_programs(self, individual, household): + """Auto-enroll the household and new individual in the configured program. + + Args: + individual: The newly created individual (res.partner) + household: The household/group (res.partner) + """ + # Get program ID from system parameter + # sudo() is intentional: system parameters require admin access + config_params = self.env["ir.config_parameter"].sudo() # nosemgrep: odoo-sudo-without-context + program_id_str = config_params.get_param("spp_dci_demo.enrollment_program_id", "") + if not program_id_str: + _logger.info("No enrollment program configured (spp_dci_demo.enrollment_program_id)") + return + + try: + program_id = int(program_id_str) + except (ValueError, TypeError): + _logger.warning("Invalid enrollment_program_id: %s", program_id_str) + return + + # Check if program membership model exists + if "spp.program.membership" not in self.env: + _logger.info("spp.program.membership model not available, skipping enrollment") + return + + # Get the program + program = self.env["spp.program"].browse(program_id) + if not program.exists(): + _logger.warning("Enrollment program ID %s does not exist", program_id) + return + + _logger.info( + "Auto-enrolling household %s and members in program %s", + household.id, + program.name, + ) + + # Enroll the household (group) if not already enrolled + self._enroll_partner_in_program(household, program) + + # Enroll all household members including the new child + # Invalidate cache so the newly created membership is included + household.invalidate_recordset(["group_membership_ids"]) + if hasattr(household, "group_membership_ids"): + for membership in household.group_membership_ids: + if membership.individual: + self._enroll_partner_in_program(membership.individual, program) + + def _enroll_partner_in_program(self, partner, program): + """Enroll a partner (individual or group) in a program. + + Args: + partner: The partner to enroll (res.partner) + program: The program to enroll in (spp.program) + """ + # Check if already enrolled + existing = self.env["spp.program.membership"].search( + [ + ("partner_id", "=", partner.id), + ("program_id", "=", program.id), + ], + limit=1, + ) + + if existing: + _logger.info( + "Partner %s already in program %s (state: %s)", + partner.id, + program.name, + existing.state, + ) + return + + # Create enrollment + try: + self.env["spp.program.membership"].create( + { + "partner_id": partner.id, + "program_id": program.id, + "state": "enrolled", + } + ) + _logger.info( + "Enrolled partner %s in program %s", + partner.id, + program.name, + ) + except Exception as e: + _logger.warning( + "Failed to enroll partner %s in program %s: %s", + partner.id, + program.name, + str(e), + ) diff --git a/spp_dci_demo/models/cr_detail_add_member.py b/spp_dci_demo/models/cr_detail_add_member.py new file mode 100644 index 00000000..0a29dcc9 --- /dev/null +++ b/spp_dci_demo/models/cr_detail_add_member.py @@ -0,0 +1,308 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +import json +import logging + +from odoo import _, api, fields, models +from odoo.exceptions import UserError + +from ..utils.dci_verification import ( + check_data_matches, + extract_person_from_dci_response, + parse_dci_response, +) + +_logger = logging.getLogger(__name__) + + +class SPPCRDetailAddMemberDCI(models.Model): + """Extend Add Member CR Detail with DCI birth verification fields.""" + + _inherit = "spp.cr.detail.add_member" + + # Birth Registration Number entered by social worker + birth_registration_number = fields.Char( + string="Birth Registration Number (BRN)", + tracking=True, + help="Birth Registration Number from the civil registry (e.g., OpenCRVS)", + ) + + # Verification status + birth_verification_status = fields.Selection( + selection=[ + ("unverified", "Unverified"), + ("verified", "Verified"), + ("not_found", "Not Found"), + ("error", "Error"), + ], + string="Birth Verification Status", + default="unverified", + tracking=True, + help="Status of birth verification via DCI", + ) + + # Data match status + dci_data_match = fields.Boolean( + string="DCI Data Matches", + readonly=True, + help="Whether the DCI response data matches the CR detail fields", + ) + + # When verification was performed + birth_verification_date = fields.Datetime( + string="Verification Date", + readonly=True, + help="When the birth verification was performed", + ) + + # Raw DCI response for audit + birth_verification_response = fields.Text( + string="Verification Response", + readonly=True, + help="Raw JSON response from DCI verification for audit purposes", + ) + + # Which CRVS registry to verify against + dci_data_source_id = fields.Many2one( + "spp.dci.data.source", + string="DCI Data Source", + domain="[('registry_type', '=', 'ns:org:RegistryType:Civil'), ('active', '=', True)]", + help="DCI data source (CRVS registry) to use for birth verification", + ) + single_dci_data_source = fields.Boolean( + compute="_compute_single_dci_data_source", + ) + + def _compute_single_dci_data_source(self): + count = self.env["spp.dci.data.source"].search_count( + [("registry_type", "=", "ns:org:RegistryType:Civil"), ("active", "=", True)], + ) + is_single = count <= 1 + for rec in self: + rec.single_dci_data_source = is_single + + @api.onchange("birth_registration_number") + def _onchange_birth_registration_number(self): + """Strip whitespace from BRN on change.""" + if self.birth_registration_number: + stripped = self.birth_registration_number.strip().upper() + if stripped != self.birth_registration_number: + self.birth_registration_number = stripped + + @api.onchange("given_name", "family_name", "birthdate", "gender_id", "birth_registration_number") + def _onchange_invalidate_verification(self): + """Reset verification status when verified fields are edited. + + This is a security control: if the user changes name, DOB, gender, or BRN + after verification, the verification is no longer valid and must be re-done. + """ + if self.birth_verification_status == "verified": + self.birth_verification_status = "unverified" + self.dci_data_match = False + self.birth_verification_date = False + self.birth_verification_response = False + return { + "warning": { + "title": _("Verification Invalidated"), + "message": _( + "Verification has been reset because you modified verified data. " + "Please verify again after making changes." + ), + } + } + + @api.model + def _get_default_dci_data_source(self): + """Get the default DCI data source for birth verification. + + Looks for a system parameter or finds the first active CRVS data source. + """ + # Try system parameter first + # sudo() is intentional: system parameters require admin access + config_params = self.env["ir.config_parameter"].sudo() # nosemgrep: odoo-sudo-without-context + param_value = config_params.get_param("spp_dci_demo.default_crvs_data_source") + if param_value: + try: + data_source = self.env["spp.dci.data.source"].browse(int(param_value)) + if data_source.exists() and data_source.active: + return data_source + except (ValueError, TypeError): + pass + + # Fall back to first active CRVS data source + return self.env["spp.dci.data.source"].search( + [ + ("registry_type", "=", "ns:org:RegistryType:Civil"), + ("active", "=", True), + ], + limit=1, + ) + + def action_verify_birth(self): + """Verify birth registration via DCI query to CRVS registry. + + 1. Validate BRN is filled + 2. Get DCI data source + 3. Call DCI client to search by BRN + 4. Parse response and update verification status + """ + self.ensure_one() + + # Validate BRN is provided + if not self.birth_registration_number: + raise UserError(_("Please enter the Birth Registration Number (BRN) before verifying.")) + + # Get the DCI data source + data_source = self.dci_data_source_id or self._get_default_dci_data_source() + if not data_source: + raise UserError( + _( + "No DCI data source configured for birth verification. " + "Please configure a CRVS data source or contact your administrator." + ) + ) + + # Import DCIClient + from odoo.addons.spp_dci_client.services.client import DCIClient + + try: + # Create client and search by BRN + # OpenCRVS requires special format - use search_by_id_opencrvs + client = DCIClient(data_source, self.env) + response = client.search_by_id_opencrvs( + identifier_type="BRN", + identifier_value=self.birth_registration_number, + event_type="birth", + ) + + # Store raw response for audit + response_json = json.dumps(response, indent=2, default=str) + + # Parse response to determine verification status + verification_status = self._parse_dci_response(response) + + # Check if DCI data matches CR detail fields + data_matches = False + if verification_status == "verified": + data_matches = self._check_data_matches_dci_response(response) + + # Update record + self.write( + { + "birth_verification_status": verification_status, + "birth_verification_date": fields.Datetime.now(), + "birth_verification_response": response_json, + "dci_data_match": data_matches, + } + ) + + # Log success + _logger.info( + "Birth verification for BRN %s completed with status: %s, data_match: %s", + self.birth_registration_number, + verification_status, + data_matches, + ) + + # Return notification + if verification_status == "verified": + if data_matches: + message = _("Birth registration verified and data matches!") + else: + message = _("Birth registration verified (data mismatch - manual review required).") + elif verification_status == "not_found": + message = _("No matching birth registration found for BRN: %s") % self.birth_registration_number + else: + message = _("Birth verification completed with status: %s") % verification_status + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Birth Verification"), + "message": message, + "type": "success" if verification_status == "verified" else "warning", + "sticky": False, + "next": {"type": "ir.actions.client", "tag": "soft_reload"}, + }, + } + + except UserError: + # Re-raise UserError as-is + raise + except Exception as e: + _logger.exception("Birth verification failed for BRN %s", self.birth_registration_number) + self.write( + { + "birth_verification_status": "error", + "birth_verification_date": fields.Datetime.now(), + "birth_verification_response": str(e), + } + ) + raise UserError(_("Birth verification failed: %s") % str(e)) from e + + def _parse_dci_response(self, response): + """Parse DCI response to determine verification status. + + Delegates to the standalone utility function. + + Args: + response: Dict response from DCI search + + Returns: + Verification status string: 'verified', 'not_found', or 'error' + """ + return parse_dci_response(response) + + def _extract_person_from_dci_response(self, response): + """Extract person data from DCI response. + + Delegates to the standalone utility function. + + Args: + response: Dict response from DCI search + + Returns: + Dict with normalized person data or None if not found + """ + return extract_person_from_dci_response(response) + + def _check_data_matches_dci_response(self, response): + """Check if DCI response data matches the CR detail fields. + + Delegates to the standalone utility function, passing field + values from this record. + + Args: + response: Dict response from DCI search + + Returns: + Boolean indicating if data matches + """ + person_data = extract_person_from_dci_response(response) + if not person_data: + _logger.warning("Could not extract person data from DCI response for matching") + return False + + gender_display = (self.gender_id.display or "") if self.gender_id else "" + matches, mismatches = check_data_matches( + person_data, + given_name=self.given_name, + family_name=self.family_name, + birthdate=self.birthdate, + gender_display=gender_display, + ) + + if mismatches: + _logger.info( + "DCI data mismatch for BRN %s: %s", + self.birth_registration_number, + "; ".join(mismatches), + ) + else: + _logger.info( + "DCI data matches CR detail for BRN %s", + self.birth_registration_number, + ) + + return matches diff --git a/spp_dci_demo/pyproject.toml b/spp_dci_demo/pyproject.toml new file mode 100644 index 00000000..4231d0cc --- /dev/null +++ b/spp_dci_demo/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/spp_dci_demo/security/ir.model.access.csv b/spp_dci_demo/security/ir.model.access.csv new file mode 100644 index 00000000..97dd8b91 --- /dev/null +++ b/spp_dci_demo/security/ir.model.access.csv @@ -0,0 +1 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink diff --git a/spp_dci_demo/tests/__init__.py b/spp_dci_demo/tests/__init__.py new file mode 100644 index 00000000..9634f3e6 --- /dev/null +++ b/spp_dci_demo/tests/__init__.py @@ -0,0 +1,5 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +from . import test_birth_verification +from . import test_apply_creates_brn +from . import test_dci_verification_utils diff --git a/spp_dci_demo/tests/test_apply_creates_brn.py b/spp_dci_demo/tests/test_apply_creates_brn.py new file mode 100644 index 00000000..9a6c5b9b --- /dev/null +++ b/spp_dci_demo/tests/test_apply_creates_brn.py @@ -0,0 +1,370 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +"""Tests for BRN registry ID creation on CR apply.""" + +from odoo import fields +from odoo.tests import TransactionCase, tagged + + +@tagged("post_install", "-at_install") +class TestApplyCreatesBRN(TransactionCase): + """Test that applying Add Member CR creates verified BRN registry ID.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.DataSource = cls.env["spp.dci.data.source"] + cls.CRDetail = cls.env["spp.cr.detail.add_member"] + cls.RegistryId = cls.env["spp.registry.id"] + + # Create a test data source + cls.data_source = cls.DataSource.create( + { + "name": "Test CRVS", + "code": "test_crvs", + "base_url": "https://crvs.example.org/api", + "auth_type": "none", + "our_sender_id": "openspp.test", + "registry_type": "ns:org:RegistryType:Civil", + "active": True, + } + ) + + # Get or create a group for testing + cls.test_group = cls.env["res.partner"].create( + { + "name": "Test Household", + "is_registrant": True, + "is_group": True, + } + ) + + # Get or create a change request type + cls.request_type = cls.env["spp.change.request.type"].search([("code", "=", "add_member")], limit=1) + if not cls.request_type: + cls.request_type = cls.env["spp.change.request.type"].create( + { + "name": "Add Member", + "code": "add_member", + "detail_model": "spp.cr.detail.add_member", + "strategy_model": "spp.cr.apply.add_member", + } + ) + + # Get BRN ID type from vocabulary + cls.brn_id_type = cls.env.ref( + "spp_dci_demo.code_id_type_brn", + raise_if_not_found=False, + ) + + def _create_test_cr_with_detail(self, **detail_kwargs): + """Helper to create a CR with detail record.""" + cr = self.env["spp.change.request"].create( + { + "registrant_id": self.test_group.id, + "request_type_id": self.request_type.id, + } + ) + + detail_vals = { + "registrant_id": self.test_group.id, + "change_request_id": cr.id, + "given_name": "Test", + "family_name": "Child", + "member_name": "CHILD, TEST", + } + detail_vals.update(detail_kwargs) + detail = self.CRDetail.create(detail_vals) + + # Link the detail to the CR + cr.write({"detail_res_id": detail.id}) + + return cr + + def test_apply_creates_verified_brn_when_verified(self): + """Test that applying a verified CR creates a verified BRN registry ID.""" + if not self.brn_id_type: + self.skipTest("BRN ID type not found - module data may not be loaded") + + # Create CR with verified birth + cr = self._create_test_cr_with_detail( + birth_registration_number="UP7D57VSEAZM", + birth_verification_status="verified", + birth_verification_date=fields.Datetime.now(), + birth_verification_response='{"test": "response"}', + dci_data_source_id=self.data_source.id, + ) + + # Apply the change request + apply_strategy = self.env["spp.cr.apply.add_member"] + apply_strategy.apply(cr) + + # Get the created individual + detail = cr.get_detail() + individual = detail.created_individual_id + self.assertTrue(individual, "Individual should be created") + + # Check registry ID was created + registry_id = self.RegistryId.search( + [ + ("partner_id", "=", individual.id), + ("id_type_id", "=", self.brn_id_type.id), + ] + ) + + self.assertTrue(registry_id, "BRN registry ID should be created") + self.assertEqual(registry_id.value, "UP7D57VSEAZM") + self.assertEqual(registry_id.status, "valid") + self.assertEqual(registry_id.verification_method, "dci_api") + self.assertTrue(registry_id.is_verified) + self.assertTrue(registry_id.verification_date) + self.assertEqual(registry_id.verification_source, self.data_source.name) + self.assertTrue(registry_id.verification_response) + + def test_apply_no_brn_when_unverified(self): + """Test that unverified CR does not create BRN registry ID.""" + if not self.brn_id_type: + self.skipTest("BRN ID type not found - module data may not be loaded") + + # Create CR with unverified birth + cr = self._create_test_cr_with_detail( + birth_registration_number="UNVERIFIED123", + birth_verification_status="unverified", + ) + + # Apply the change request + apply_strategy = self.env["spp.cr.apply.add_member"] + apply_strategy.apply(cr) + + # Get the created individual + detail = cr.get_detail() + individual = detail.created_individual_id + self.assertTrue(individual, "Individual should be created") + + # Check registry ID was NOT created + registry_id = self.RegistryId.search( + [ + ("partner_id", "=", individual.id), + ("id_type_id", "=", self.brn_id_type.id), + ] + ) + + self.assertFalse(registry_id, "BRN registry ID should NOT be created for unverified") + + def test_apply_no_brn_when_not_found(self): + """Test that 'not_found' verification does not create BRN registry ID.""" + if not self.brn_id_type: + self.skipTest("BRN ID type not found - module data may not be loaded") + + # Create CR with not_found status + cr = self._create_test_cr_with_detail( + birth_registration_number="NOTFOUND123", + birth_verification_status="not_found", + ) + + # Apply the change request + apply_strategy = self.env["spp.cr.apply.add_member"] + apply_strategy.apply(cr) + + # Get the created individual + detail = cr.get_detail() + individual = detail.created_individual_id + + # Check registry ID was NOT created + registry_id = self.RegistryId.search( + [ + ("partner_id", "=", individual.id), + ("id_type_id", "=", self.brn_id_type.id), + ] + ) + + self.assertFalse(registry_id, "BRN registry ID should NOT be created for not_found") + + def test_apply_no_brn_when_no_brn_number(self): + """Test that verified CR without BRN number does not create registry ID.""" + if not self.brn_id_type: + self.skipTest("BRN ID type not found - module data may not be loaded") + + # Create CR verified but no BRN + cr = self._create_test_cr_with_detail( + birth_verification_status="verified", + birth_verification_date=fields.Datetime.now(), + # No birth_registration_number + ) + + # Apply the change request + apply_strategy = self.env["spp.cr.apply.add_member"] + apply_strategy.apply(cr) + + # Get the created individual + detail = cr.get_detail() + individual = detail.created_individual_id + + # Check registry ID was NOT created + registry_id = self.RegistryId.search( + [ + ("partner_id", "=", individual.id), + ("id_type_id", "=", self.brn_id_type.id), + ] + ) + + self.assertFalse(registry_id, "BRN registry ID should NOT be created without BRN number") + + def test_apply_updates_existing_brn(self): + """Test that applying updates existing BRN registry ID.""" + if not self.brn_id_type: + self.skipTest("BRN ID type not found - module data may not be loaded") + + # First, create an individual with an existing BRN + individual = self.env["res.partner"].create( + { + "name": "Existing Child", + "is_registrant": True, + "is_group": False, + } + ) + + # Create an existing unverified BRN + existing_brn = self.RegistryId.create( + { + "partner_id": individual.id, + "id_type_id": self.brn_id_type.id, + "value": "OLD_BRN", + "status": "invalid", + } + ) + + # Create CR with verified birth + cr = self._create_test_cr_with_detail( + birth_registration_number="NEW_BRN", + birth_verification_status="verified", + birth_verification_date=fields.Datetime.now(), + birth_verification_response='{"new": "data"}', + dci_data_source_id=self.data_source.id, + ) + + # Manually set the created_individual_id to our existing individual + detail = cr.get_detail() + detail.write( + { + "created_individual_id": individual.id, + } + ) + + # Call the BRN creation method directly + apply_strategy = self.env["spp.cr.apply.add_member"] + apply_strategy._create_verified_brn_registry_id(detail) + + # Refresh existing BRN + existing_brn.invalidate_recordset() + + # Check existing BRN was updated + self.assertEqual(existing_brn.value, "NEW_BRN") + self.assertEqual(existing_brn.status, "valid") + self.assertEqual(existing_brn.verification_method, "dci_api") + self.assertTrue(existing_brn.is_verified) + + +@tagged("post_install", "-at_install") +class TestRegistryIdVerification(TransactionCase): + """Test verification fields on spp.registry.id model.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.RegistryId = cls.env["spp.registry.id"] + + # Get an ID type + cls.id_type = cls.env["spp.vocabulary.code"].search( + [ + ("vocabulary_id.namespace_uri", "=", "urn:openspp:vocab:id-type"), + ], + limit=1, + ) + if not cls.id_type: + # Create a vocabulary and code for testing + vocab = cls.env["spp.vocabulary"].create( + { + "name": "ID Type", + "namespace_uri": "urn:openspp:vocab:id-type", + } + ) + cls.id_type = cls.env["spp.vocabulary.code"].create( + { + "vocabulary_id": vocab.id, + "code": "test_id", + "display": "Test ID", + } + ) + + # Create a test registrant + cls.registrant = cls.env["res.partner"].create( + { + "name": "Test Registrant", + "is_registrant": True, + "is_group": False, + } + ) + + def test_is_verified_computed_for_dci_api(self): + """Test is_verified is True for dci_api verification method.""" + registry_id = self.RegistryId.create( + { + "partner_id": self.registrant.id, + "id_type_id": self.id_type.id, + "value": "TEST123", + "verification_method": "dci_api", + } + ) + + self.assertTrue(registry_id.is_verified) + + def test_is_verified_computed_for_physical_document(self): + """Test is_verified is True for physical_document verification method.""" + registry_id = self.RegistryId.create( + { + "partner_id": self.registrant.id, + "id_type_id": self.id_type.id, + "value": "TEST456", + "verification_method": "physical_document", + } + ) + + self.assertTrue(registry_id.is_verified) + + def test_is_verified_false_for_verbal(self): + """Test is_verified is False for verbal verification method.""" + registry_id = self.RegistryId.create( + { + "partner_id": self.registrant.id, + "id_type_id": self.id_type.id, + "value": "TEST789", + "verification_method": "verbal", + } + ) + + self.assertFalse(registry_id.is_verified) + + def test_is_verified_false_for_self_declared(self): + """Test is_verified is False for self_declared verification method.""" + registry_id = self.RegistryId.create( + { + "partner_id": self.registrant.id, + "id_type_id": self.id_type.id, + "value": "TEST012", + "verification_method": "self_declared", + } + ) + + self.assertFalse(registry_id.is_verified) + + def test_is_verified_false_when_no_method(self): + """Test is_verified is False when no verification method set.""" + registry_id = self.RegistryId.create( + { + "partner_id": self.registrant.id, + "id_type_id": self.id_type.id, + "value": "TEST345", + } + ) + + self.assertFalse(registry_id.is_verified) diff --git a/spp_dci_demo/tests/test_birth_verification.py b/spp_dci_demo/tests/test_birth_verification.py new file mode 100644 index 00000000..cbe65cec --- /dev/null +++ b/spp_dci_demo/tests/test_birth_verification.py @@ -0,0 +1,266 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +"""Tests for DCI birth verification in Add Member change request.""" + +import json +from unittest.mock import MagicMock, patch + +from odoo.exceptions import UserError +from odoo.tests import TransactionCase, tagged + + +@tagged("post_install", "-at_install") +class TestBirthVerification(TransactionCase): + """Test birth verification via DCI in Add Member CR.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.DataSource = cls.env["spp.dci.data.source"] + cls.CRDetail = cls.env["spp.cr.detail.add_member"] + + # Create a test data source + cls.data_source = cls.DataSource.create( + { + "name": "Test CRVS", + "code": "test_crvs", + "base_url": "https://crvs.example.org/api", + "auth_type": "none", + "our_sender_id": "openspp.test", + "registry_type": "ns:org:RegistryType:Civil", + "active": True, + } + ) + + # Get or create a group for testing + cls.test_group = cls.env["res.partner"].create( + { + "name": "Test Household", + "is_registrant": True, + "is_group": True, + } + ) + + # Get or create a change request type + cls.request_type = cls.env["spp.change.request.type"].search([("code", "=", "add_member")], limit=1) + if not cls.request_type: + cls.request_type = cls.env["spp.change.request.type"].create( + { + "name": "Add Member", + "code": "add_member", + "detail_model": "spp.cr.detail.add_member", + "strategy_model": "spp.cr.apply.add_member", + } + ) + + def _create_test_cr_detail(self, **kwargs): + """Helper to create a test CR detail record.""" + # Create a change request first + cr = self.env["spp.change.request"].create( + { + "registrant_id": self.test_group.id, + "request_type_id": self.request_type.id, + } + ) + + # Create detail record + detail_vals = { + "registrant_id": self.test_group.id, + "change_request_id": cr.id, + "given_name": "Test", + "family_name": "Child", + "member_name": "CHILD, TEST", + } + detail_vals.update(kwargs) + detail = self.CRDetail.create(detail_vals) + + # Link the detail to the CR + cr.write({"detail_res_id": detail.id}) + + return detail + + def test_birth_verification_fields_exist(self): + """Test that DCI birth verification fields are present on CR detail.""" + detail = self._create_test_cr_detail() + + # Check field existence + self.assertIn("birth_registration_number", detail._fields) + self.assertIn("birth_verification_status", detail._fields) + self.assertIn("birth_verification_date", detail._fields) + self.assertIn("birth_verification_response", detail._fields) + self.assertIn("dci_data_source_id", detail._fields) + + def test_birth_verification_default_status(self): + """Test default verification status is 'unverified'.""" + detail = self._create_test_cr_detail() + self.assertEqual(detail.birth_verification_status, "unverified") + + def test_action_verify_birth_requires_brn(self): + """Test verify action fails without BRN.""" + detail = self._create_test_cr_detail() + + with self.assertRaises(UserError) as cm: + detail.action_verify_birth() + self.assertIn("Birth Registration Number", str(cm.exception)) + + def test_action_verify_birth_requires_data_source(self): + """Test verify action fails without data source.""" + # Deactivate all CRVS data sources + self.DataSource.search( + [ + ("registry_type", "=", "ns:org:RegistryType:Civil"), + ] + ).write({"active": False}) + + detail = self._create_test_cr_detail( + birth_registration_number="TEST123", + ) + + with self.assertRaises(UserError) as cm: + detail.action_verify_birth() + self.assertIn("data source", str(cm.exception).lower()) + + # Re-activate for other tests + self.data_source.active = True + + @patch("odoo.addons.spp_dci_client.services.client.DCIClient") + def test_action_verify_birth_success_direct_response(self, mock_client_class): + """Test successful verification with OpenCRVS direct response format.""" + # Mock DCI client response (OpenCRVS format) + mock_client = MagicMock() + mock_client.search_by_id_opencrvs.return_value = { + "identifier": [ + {"identifier_type": "UIN", "identifier_value": "5126797337"}, + {"identifier_type": "BRN", "identifier_value": "UP7D57VSEAZM"}, + ], + "name": {"given_name": "George", "second_name": "", "surname": "Smith"}, + "sex": "male", + "birth_date": "2026-02-05", + } + mock_client_class.return_value = mock_client + + detail = self._create_test_cr_detail( + birth_registration_number="UP7D57VSEAZM", + dci_data_source_id=self.data_source.id, + ) + + result = detail.action_verify_birth() + + # Verify status updated + self.assertEqual(detail.birth_verification_status, "verified") + self.assertTrue(detail.birth_verification_date) + self.assertTrue(detail.birth_verification_response) + + # Verify response is stored as JSON + response_data = json.loads(detail.birth_verification_response) + self.assertIn("identifier", response_data) + + # Verify notification is returned + self.assertEqual(result["type"], "ir.actions.client") + self.assertEqual(result["tag"], "display_notification") + self.assertEqual(result["params"]["type"], "success") + + @patch("odoo.addons.spp_dci_client.services.client.DCIClient") + def test_action_verify_birth_success_dci_format(self, mock_client_class): + """Test successful verification with standard DCI response format.""" + mock_client = MagicMock() + mock_client.search_by_id_opencrvs.return_value = { + "header": {"status": "succ"}, + "message": { + "search_response": [ + { + "status": "succ", + "data": [ + { + "identifier": [{"type": "BRN", "value": "TEST123"}], + "name": {"given_name": "Test"}, + } + ], + } + ] + }, + } + mock_client_class.return_value = mock_client + + detail = self._create_test_cr_detail( + birth_registration_number="TEST123", + dci_data_source_id=self.data_source.id, + ) + + detail.action_verify_birth() + + self.assertEqual(detail.birth_verification_status, "verified") + + @patch("odoo.addons.spp_dci_client.services.client.DCIClient") + def test_action_verify_birth_not_found(self, mock_client_class): + """Test verification with no matching record.""" + mock_client = MagicMock() + mock_client.search_by_id_opencrvs.return_value = { + "message": { + "search_response": [ + { + "status": "succ", + "data": [], + } + ] + }, + } + mock_client_class.return_value = mock_client + + detail = self._create_test_cr_detail( + birth_registration_number="NONEXISTENT", + dci_data_source_id=self.data_source.id, + ) + + result = detail.action_verify_birth() + + self.assertEqual(detail.birth_verification_status, "not_found") + self.assertEqual(result["params"]["type"], "warning") + + @patch("odoo.addons.spp_dci_client.services.client.DCIClient") + def test_action_verify_birth_error(self, mock_client_class): + """Test verification with API error.""" + mock_client = MagicMock() + mock_client.search_by_id_opencrvs.side_effect = Exception("API Error") + mock_client_class.return_value = mock_client + + detail = self._create_test_cr_detail( + birth_registration_number="TEST123", + dci_data_source_id=self.data_source.id, + ) + + with self.assertRaises(UserError) as cm: + detail.action_verify_birth() + + self.assertIn("API Error", str(cm.exception)) + # Note: The status update is rolled back with the transaction when exception is raised + # So we only verify the exception message, not the status + + def test_get_default_dci_data_source(self): + """Test getting default DCI data source.""" + detail = self._create_test_cr_detail() + + default_source = detail._get_default_dci_data_source() + + # Should find our test data source + self.assertTrue(default_source) + self.assertEqual(default_source.registry_type, "ns:org:RegistryType:Civil") + self.assertTrue(default_source.active) + + @patch("odoo.addons.spp_dci_client.services.client.DCIClient") + def test_parse_dci_response_error_status(self, mock_client_class): + """Test parsing response with error status in header.""" + mock_client = MagicMock() + mock_client.search_by_id_opencrvs.return_value = { + "header": {"status": "rjct"}, + "message": {}, + } + mock_client_class.return_value = mock_client + + detail = self._create_test_cr_detail( + birth_registration_number="TEST123", + dci_data_source_id=self.data_source.id, + ) + + detail.action_verify_birth() + + self.assertEqual(detail.birth_verification_status, "error") diff --git a/spp_dci_demo/tests/test_dci_verification_utils.py b/spp_dci_demo/tests/test_dci_verification_utils.py new file mode 100644 index 00000000..1ddce73a --- /dev/null +++ b/spp_dci_demo/tests/test_dci_verification_utils.py @@ -0,0 +1,414 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +from datetime import date + +from odoo.tests import TransactionCase, tagged + + +@tagged("post_install", "-at_install") +class TestParseDCIResponse(TransactionCase): + """Tests for parse_dci_response standalone function.""" + + def _get_function(self): + from odoo.addons.spp_dci_demo.utils.dci_verification import ( + parse_dci_response, + ) + + return parse_dci_response + + def test_dci_format_success_with_data(self): + """Standard DCI response with successful status and data returns 'verified'.""" + parse = self._get_function() + response = { + "header": {"status": "succ"}, + "message": { + "search_response": [ + { + "status": "succ", + "data": [{"name": {"given_name": "John"}}], + } + ] + }, + } + self.assertEqual(parse(response), "verified") + + def test_dci_format_success_no_data(self): + """DCI response with success status but empty data returns 'not_found'.""" + parse = self._get_function() + response = { + "header": {"status": "succ"}, + "message": { + "search_response": [ + { + "status": "succ", + "data": [], + } + ] + }, + } + self.assertEqual(parse(response), "not_found") + + def test_dci_format_rejected_header(self): + """DCI response with rejected header status returns 'error'.""" + parse = self._get_function() + response = { + "header": {"status": "rjct"}, + "message": {"search_response": []}, + } + self.assertEqual(parse(response), "error") + + def test_dci_format_rejected_search_response(self): + """DCI response with rejected search_response item returns 'error'.""" + parse = self._get_function() + response = { + "header": {"status": "succ"}, + "message": { + "search_response": [ + { + "status": "rjct", + "data": [], + } + ] + }, + } + self.assertEqual(parse(response), "error") + + def test_dci_format_empty_search_response(self): + """DCI response with empty search_response list returns 'not_found'.""" + parse = self._get_function() + response = { + "header": {"status": "succ"}, + "message": {"search_response": []}, + } + self.assertEqual(parse(response), "not_found") + + def test_opencrvs_direct_format_with_identifier(self): + """OpenCRVS direct response with 'identifier' key returns 'verified'.""" + parse = self._get_function() + response = { + "identifier": "BRN-12345", + "name": {"given_name": "John", "surname": "Doe"}, + } + self.assertEqual(parse(response), "verified") + + def test_opencrvs_direct_format_with_name_only(self): + """OpenCRVS direct response with 'name' key returns 'verified'.""" + parse = self._get_function() + response = { + "name": {"given_name": "John", "surname": "Doe"}, + } + self.assertEqual(parse(response), "verified") + + def test_unexpected_format_returns_not_found(self): + """Unknown response structure returns 'not_found'.""" + parse = self._get_function() + response = {"something_else": True} + self.assertEqual(parse(response), "not_found") + + def test_empty_dict_returns_not_found(self): + """Empty dict returns 'not_found'.""" + parse = self._get_function() + self.assertEqual(parse({}), "not_found") + + +@tagged("post_install", "-at_install") +class TestExtractPersonFromDCIResponse(TransactionCase): + """Tests for extract_person_from_dci_response standalone function.""" + + def _get_function(self): + from odoo.addons.spp_dci_demo.utils.dci_verification import ( + extract_person_from_dci_response, + ) + + return extract_person_from_dci_response + + def test_dci_format_list_data(self): + """Extract person from standard DCI format with data as list.""" + extract = self._get_function() + response = { + "message": { + "search_response": [ + { + "status": "succ", + "data": [ + { + "name": {"given_name": "Jane", "surname": "Doe"}, + "birth_date": "2024-01-15", + "sex": "Female", + } + ], + } + ] + }, + } + result = extract(response) + self.assertIsNotNone(result) + self.assertEqual(result["given_name"], "JANE") + self.assertEqual(result["family_name"], "DOE") + self.assertEqual(result["birth_date"], "2024-01-15") + self.assertEqual(result["sex"], "female") + + def test_dci_format_dict_data(self): + """Extract person from DCI format with data as dict.""" + extract = self._get_function() + response = { + "message": { + "search_response": [ + { + "status": "succ", + "data": { + "name": {"given_name": "Bob", "surname": "Smith"}, + "birthdate": "2023-06-01", + "gender": "Male", + }, + } + ] + }, + } + result = extract(response) + self.assertIsNotNone(result) + self.assertEqual(result["given_name"], "BOB") + self.assertEqual(result["family_name"], "SMITH") + self.assertEqual(result["birth_date"], "2023-06-01") + self.assertEqual(result["sex"], "male") + + def test_opencrvs_direct_format(self): + """Extract person from OpenCRVS direct response format.""" + extract = self._get_function() + response = { + "identifier": "BRN-12345", + "name": {"given_name": "Alice", "surname": "Johnson"}, + "birth_date": "2024-03-20", + "sex": "Female", + } + result = extract(response) + self.assertIsNotNone(result) + self.assertEqual(result["given_name"], "ALICE") + self.assertEqual(result["family_name"], "JOHNSON") + self.assertEqual(result["birth_date"], "2024-03-20") + self.assertEqual(result["sex"], "female") + + def test_name_as_string(self): + """Extract person when name is a plain string.""" + extract = self._get_function() + response = { + "name": "John Doe", + } + result = extract(response) + self.assertIsNotNone(result) + self.assertEqual(result["given_name"], "JOHN DOE") + self.assertNotIn("family_name", result) + + def test_no_person_data(self): + """Return None when no person data can be extracted.""" + extract = self._get_function() + response = {"something_else": True} + self.assertIsNone(extract(response)) + + def test_empty_search_response(self): + """Return None when search_response is empty.""" + extract = self._get_function() + response = { + "message": {"search_response": []}, + } + self.assertIsNone(extract(response)) + + def test_empty_data_in_search_response(self): + """Return None when data in search_response is empty.""" + extract = self._get_function() + response = { + "message": { + "search_response": [ + {"status": "succ", "data": []}, + ] + }, + } + self.assertIsNone(extract(response)) + + +@tagged("post_install", "-at_install") +class TestCheckDataMatches(TransactionCase): + """Tests for check_data_matches standalone function.""" + + def _get_function(self): + from odoo.addons.spp_dci_demo.utils.dci_verification import ( + check_data_matches, + ) + + return check_data_matches + + def test_all_fields_match(self): + """All fields matching returns (True, []).""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + "family_name": "DOE", + "birth_date": "2024-01-15", + "sex": "male", + } + matches, mismatches = check( + person_data, + given_name="John", + family_name="Doe", + birthdate=date(2024, 1, 15), + gender_display="Male", + ) + self.assertTrue(matches) + self.assertEqual(mismatches, []) + + def test_given_name_mismatch(self): + """Given name mismatch returns (False, [mismatch info]).""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + "family_name": "DOE", + } + matches, mismatches = check( + person_data, + given_name="Jane", + family_name="Doe", + birthdate=None, + gender_display=None, + ) + self.assertFalse(matches) + self.assertEqual(len(mismatches), 1) + self.assertIn("given_name", mismatches[0]) + + def test_family_name_mismatch(self): + """Family name mismatch returns (False, [mismatch info]).""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + "family_name": "DOE", + } + matches, mismatches = check( + person_data, + given_name="John", + family_name="Smith", + birthdate=None, + gender_display=None, + ) + self.assertFalse(matches) + self.assertEqual(len(mismatches), 1) + self.assertIn("family_name", mismatches[0]) + + def test_birthdate_mismatch(self): + """Birthdate mismatch returns (False, [mismatch info]).""" + check = self._get_function() + person_data = { + "birth_date": "2024-01-15", + } + matches, mismatches = check( + person_data, + given_name=None, + family_name=None, + birthdate=date(2024, 6, 20), + gender_display=None, + ) + self.assertFalse(matches) + self.assertEqual(len(mismatches), 1) + self.assertIn("birthdate", mismatches[0]) + + def test_gender_mismatch(self): + """Gender mismatch returns (False, [mismatch info]).""" + check = self._get_function() + person_data = { + "sex": "male", + } + matches, mismatches = check( + person_data, + given_name=None, + family_name=None, + birthdate=None, + gender_display="Female", + ) + self.assertFalse(matches) + self.assertEqual(len(mismatches), 1) + self.assertIn("gender", mismatches[0]) + + def test_multiple_mismatches(self): + """Multiple mismatches returns all mismatch details.""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + "family_name": "DOE", + "birth_date": "2024-01-15", + "sex": "male", + } + matches, mismatches = check( + person_data, + given_name="Jane", + family_name="Smith", + birthdate=date(2024, 6, 20), + gender_display="Female", + ) + self.assertFalse(matches) + self.assertEqual(len(mismatches), 4) + + def test_missing_dci_fields_still_match(self): + """When DCI data doesn't have a field, it's not considered a mismatch.""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + } + matches, mismatches = check( + person_data, + given_name="John", + family_name="Doe", + birthdate=date(2024, 1, 15), + gender_display="Male", + ) + self.assertTrue(matches) + self.assertEqual(mismatches, []) + + def test_missing_cr_fields_still_match(self): + """When CR data is None/empty, those fields are not compared.""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + "family_name": "DOE", + "birth_date": "2024-01-15", + "sex": "male", + } + matches, mismatches = check( + person_data, + given_name="John", + family_name="Doe", + birthdate=None, + gender_display=None, + ) + self.assertTrue(matches) + self.assertEqual(mismatches, []) + + def test_case_insensitive_name_comparison(self): + """Name comparison is case-insensitive.""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + "family_name": "DOE", + } + matches, mismatches = check( + person_data, + given_name="john", + family_name="doe", + birthdate=None, + gender_display=None, + ) + self.assertTrue(matches) + self.assertEqual(mismatches, []) + + def test_whitespace_stripped_from_names(self): + """Leading/trailing whitespace is stripped before comparison.""" + check = self._get_function() + person_data = { + "given_name": "JOHN", + } + matches, mismatches = check( + person_data, + given_name=" John ", + family_name=None, + birthdate=None, + gender_display=None, + ) + self.assertTrue(matches) + self.assertEqual(mismatches, []) diff --git a/spp_dci_demo/utils/__init__.py b/spp_dci_demo/utils/__init__.py new file mode 100644 index 00000000..5b0c5427 --- /dev/null +++ b/spp_dci_demo/utils/__init__.py @@ -0,0 +1,3 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +from . import dci_verification diff --git a/spp_dci_demo/utils/dci_verification.py b/spp_dci_demo/utils/dci_verification.py new file mode 100644 index 00000000..db708ae8 --- /dev/null +++ b/spp_dci_demo/utils/dci_verification.py @@ -0,0 +1,170 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +"""Standalone DCI verification utility functions. + +These functions extract, parse, and compare DCI response data without +depending on Odoo models. Both the wizard and the detail model call +these functions instead of implementing their own logic. +""" + +import logging + +_logger = logging.getLogger(__name__) + + +def parse_dci_response(response): + """Parse DCI response to determine verification status. + + Args: + response: Dict response from DCI search + + Returns: + Verification status string: 'verified', 'not_found', or 'error' + """ + # Check for error status in header + if "header" in response: + header = response["header"] + if header.get("status") == "rjct": + return "error" + + # Check for search_response in message + if "message" in response: + message = response["message"] + if "search_response" in message: + search_responses = message["search_response"] + if search_responses: + # Check first response item + first_response = search_responses[0] + if first_response.get("status") == "succ": + # Check if we have data + data = first_response.get("data", []) + if data: + return "verified" + return "not_found" + elif first_response.get("status") == "rjct": + return "error" + return "not_found" + + # OpenCRVS direct response format (record directly in response) + if "identifier" in response or "name" in response: + return "verified" + + # Unexpected response format + _logger.warning( + "Unexpected DCI response structure, treating as not found. Keys: %s", + list(response.keys()) if isinstance(response, dict) else type(response), + ) + return "not_found" + + +def extract_person_from_dci_response(response): + """Extract normalized person data from DCI response. + + Args: + response: Dict response from DCI search + + Returns: + Dict with normalized person data or None if not found. + Keys: given_name, family_name, birth_date, sex (all uppercase names, + lowercase sex). + """ + person_data = None + + # Check for search_response in message (standard DCI format) + if "message" in response: + message = response["message"] + if "search_response" in message: + search_responses = message["search_response"] + if search_responses: + first_response = search_responses[0] + data = first_response.get("data", {}) + if data: + # Data can be a list or a dict depending on registry + if isinstance(data, list): + person_data = data[0] # First matching record + elif isinstance(data, dict): + # Check for reg_records (OpenCRVS SPDCI format) + if "reg_records" in data and data["reg_records"]: + person_data = data["reg_records"][0] + else: + person_data = data # Direct dict response + + # OpenCRVS direct response format + if not person_data and ("identifier" in response or "name" in response): + person_data = response + + if not person_data: + return None + + # Normalize the person data + normalized = {} + + # Extract name + if "name" in person_data: + name = person_data["name"] + if isinstance(name, dict): + normalized["given_name"] = name.get("given_name", "").strip().upper() + normalized["family_name"] = name.get("surname", "").strip().upper() + elif isinstance(name, str): + normalized["given_name"] = name.strip().upper() + + # Extract birth date + if "birth_date" in person_data: + normalized["birth_date"] = person_data["birth_date"] + elif "birthdate" in person_data: + normalized["birth_date"] = person_data["birthdate"] + + # Extract sex/gender + if "sex" in person_data: + normalized["sex"] = person_data["sex"].lower() + elif "gender" in person_data: + normalized["sex"] = person_data["gender"].lower() + + return normalized + + +def check_data_matches(person_data, given_name, family_name, birthdate, gender_display): + """Check if DCI data matches provided fields. + + Args: + person_data: Dict from extract_person_from_dci_response + given_name: Given name string from CR/wizard + family_name: Family name string from CR/wizard + birthdate: date object or None from CR/wizard + gender_display: Gender display string (e.g., "Male") or None + + Returns: + Tuple of (matches: bool, mismatches: list[str]) + """ + mismatches = [] + + # Compare given name + if person_data.get("given_name"): + cr_given_name = (given_name or "").strip().upper() + dci_given_name = person_data["given_name"] + if cr_given_name != dci_given_name: + mismatches.append(f"given_name: CR='{cr_given_name}' vs DCI='{dci_given_name}'") + + # Compare family name + if person_data.get("family_name"): + cr_family_name = (family_name or "").strip().upper() + dci_family_name = person_data["family_name"] + if cr_family_name != dci_family_name: + mismatches.append(f"family_name: CR='{cr_family_name}' vs DCI='{dci_family_name}'") + + # Compare birth date + if person_data.get("birth_date") and birthdate: + cr_birthdate = str(birthdate) + dci_birthdate = person_data["birth_date"] + # Handle different date formats (YYYY-MM-DD) + if cr_birthdate[:10] != dci_birthdate[:10]: + mismatches.append(f"birthdate: CR='{cr_birthdate}' vs DCI='{dci_birthdate}'") + + # Compare gender/sex + if person_data.get("sex") and gender_display: + cr_gender = gender_display.lower() + dci_sex = person_data["sex"].lower() + if cr_gender != dci_sex: + mismatches.append(f"gender: CR='{cr_gender}' vs DCI='{dci_sex}'") + + return (len(mismatches) == 0, mismatches) diff --git a/spp_dci_demo/views/change_request_view.xml b/spp_dci_demo/views/change_request_view.xml new file mode 100644 index 00000000..b8406b0b --- /dev/null +++ b/spp_dci_demo/views/change_request_view.xml @@ -0,0 +1,35 @@ + + + + + spp.change.request.form.dci + spp.change.request + + + + +
+
+ +
+
+ DCI Birth Verification +
+ +
+
+
+ + + +
+
+
+
diff --git a/spp_dci_demo/views/cr_detail_add_member_view.xml b/spp_dci_demo/views/cr_detail_add_member_view.xml new file mode 100644 index 00000000..4d01b9f5 --- /dev/null +++ b/spp_dci_demo/views/cr_detail_add_member_view.xml @@ -0,0 +1,82 @@ + + + + + spp.cr.detail.add_member.form.dci + spp.cr.detail.add_member + + + + + 1 + + + + + 1 + + + + + + + + + + + + + + + +
+
+
+
+
+
+
diff --git a/spp_mis_demo_v2/data/event_types.xml b/spp_mis_demo_v2/data/event_types.xml index 9436c65d..9280ef5b 100644 --- a/spp_mis_demo_v2/data/event_types.xml +++ b/spp_mis_demo_v2/data/event_types.xml @@ -20,6 +20,19 @@ + + + Health Visit + health_visit + Health checkup visits for children under 5, required for Conditional Child Grant compliance. Tracks immunization and growth monitoring. + visit + both + internal + 5 + + Training Session diff --git a/spp_mis_demo_v2/models/demo_programs.py b/spp_mis_demo_v2/models/demo_programs.py index 15372d0f..d38f14be 100644 --- a/spp_mis_demo_v2/models/demo_programs.py +++ b/spp_mis_demo_v2/models/demo_programs.py @@ -10,13 +10,14 @@ 2. Demonstrate different CEL expression patterns 3. Link to existing Logic Packs from spp_studio -Program Catalog (6 programs): +Program Catalog (7 programs): 1. Universal Child Grant - Member aggregation (child_benefit pack) -2. Elderly Social Pension - Age + constants (social_pension pack) -3. Emergency Relief Fund - Cached metrics (vulnerability_assessment pack) -4. Cash Transfer Program - Poverty threshold (cash_transfer_basic pack) -5. Disability Support Grant - Member existence (disability_assistance pack) -6. Food Assistance - Basic active check (no pack, simple CEL) +2. Conditional Child Grant - First 1,000 days with compliance (child_benefit pack) +3. Elderly Social Pension - Age + constants (social_pension pack) +4. Emergency Relief Fund - Cached metrics (vulnerability_assessment pack) +5. Cash Transfer Program - Poverty threshold (cash_transfer_basic pack) +6. Disability Support Grant - Member existence (disability_assistance pack) +7. Food Assistance - Basic active check (no pack, simple CEL) CEL Expression Patterns Demonstrated: - Field comparison: r.active == true @@ -24,6 +25,7 @@ - Aggregate variables: hh_total_income < poverty_line, child_count > 0 - Compound conditions: dependency_ratio >= 1.5 or (is_female_headed and elderly_count > 0) - Arithmetic with variables: base_child_grant * child_count, disabled_count * disability_grant_per_member +- Compliance criteria: members.exists(m, age_years(m.birthdate) < 5) """ # Demo programs aligned with spec and Logic Packs @@ -55,6 +57,37 @@ "Logic Pack: child_benefit", ], }, + { + "id": "conditional_child_grant", + "name": "Conditional Child Grant", + "description": "Monthly grant for households with pregnant women and children aged 0-2. " + "Targets the critical first 1,000 days of life to support nutrition and " + "health-seeking behavior. Compliance requires prenatal visits, health " + "checkups, and immunizations.", + "target_type": "group", + "entitlement_amount": 10.0, + "entitlement_formula": "first_1000_days_grant", + "cycle_duration": 30, # Monthly + # CEL: Households with children under 2 (first 1,000 days) + # Pattern: Member age check via members.exists() + "cel_expression": "r.is_group == true and members.exists(m, age_years(m.birthdate) < 2)", + # Compliance: prenatal visits, health checkups, immunizations + "compliance_cel_expression": "members.exists(m, age_years(m.birthdate) <= 2)", + # Link to Logic Pack + "logic_pack": "child_benefit", + "use_logic_studio": True, + "logic_name": "Conditional Child Grant Eligibility", + "expression_type": "filter", + "stories": [], + "demo_points": [ + "Conditional cash transfer", + "First 1,000 days targeting (0-2 years)", + "Health visit and immunization compliance", + "Compliance manager with CEL expression", + "CEL: members.exists() for eligibility and compliance", + "Logic Pack: child_benefit", + ], + }, { "id": "elderly_social_pension", "name": "Elderly Social Pension", diff --git a/spp_mis_demo_v2/models/demo_variables.py b/spp_mis_demo_v2/models/demo_variables.py index d344b001..3a57e0ff 100644 --- a/spp_mis_demo_v2/models/demo_variables.py +++ b/spp_mis_demo_v2/models/demo_variables.py @@ -46,6 +46,7 @@ # Fixed program amounts "elderly_pension_amount": 100, "cash_transfer_amount": 150, + "first_1000_days_grant": 10, # Monthly per-beneficiary } diff --git a/spp_mis_demo_v2/models/mis_demo_generator.py b/spp_mis_demo_v2/models/mis_demo_generator.py index 49a83846..83efec58 100644 --- a/spp_mis_demo_v2/models/mis_demo_generator.py +++ b/spp_mis_demo_v2/models/mis_demo_generator.py @@ -1095,6 +1095,10 @@ def _create_demo_programs(self, stats): if program_def.get("cycle_duration"): self._configure_cycle_manager(program, program_def) + # Configure compliance manager if compliance CEL expression specified + if program_def.get("compliance_cel_expression"): + self._configure_compliance_manager(program, program_def) + except Exception as e: _logger.error( "Error creating program (program_id=%s): %s", @@ -1199,6 +1203,50 @@ def _configure_cycle_manager(self, program, program_def): e, ) + def _configure_compliance_manager(self, program, program_def): + """Configure the compliance manager with a CEL expression. + + Sets the compliance CEL expression for ongoing beneficiary verification. + """ + try: + compliance_manager = program.get_manager(program.MANAGER_COMPLIANCE) + if not compliance_manager: + _logger.warning( + "No compliance manager found for program (program_id=%s)", + program.id, + ) + return + + cel_expression = program_def.get("compliance_cel_expression") + if not cel_expression: + return + + if "compliance_cel_expression" not in compliance_manager._fields: + _logger.info( + "Compliance CEL not available for program (program_id=%s)", + program.id, + ) + return + + compliance_manager.write( + { + "compliance_cel_mode": "cel", + "compliance_cel_expression": cel_expression, + } + ) + _logger.info( + "Configured compliance CEL for program (program_id=%s): %s", + program.id, + cel_expression, + ) + + except Exception as e: + _logger.warning( + "Could not configure compliance manager for program (program_id=%s): %s", + program.id, + e, + ) + def _configure_eligibility_manager(self, program, program_def): """Configure the eligibility manager with CEL expression. diff --git a/spp_registry/models/reg_id.py b/spp_registry/models/reg_id.py index aec3d031..bfd1c41e 100644 --- a/spp_registry/models/reg_id.py +++ b/spp_registry/models/reg_id.py @@ -38,6 +38,40 @@ class SPPRegistrantID(models.Model): description = fields.Char() + # Verification fields + verification_method = fields.Selection( + selection=[ + ("dci_api", "DCI API Verification"), + ("physical_document", "Physical Document"), + ("scanned", "Scanned Document"), + ("verbal", "Verbal (Unverified)"), + ("self_declared", "Self Declared"), + ("manual_lookup", "Manual Lookup"), + ("biometric", "Biometric Match"), + ], + string="Verification Method", + help="How this ID was verified", + ) + is_verified = fields.Boolean( + string="Verified", + default=False, + compute="_compute_is_verified", + store=True, + help="Whether this ID has been verified", + ) + verification_date = fields.Datetime( + string="Verification Date", + help="When the ID was verified", + ) + verification_source = fields.Char( + string="Verification Source", + help="System/person that verified this ID (e.g., 'OpenCRVS', 'Staff: John')", + ) + verification_response = fields.Text( + string="Verification Response", + help="Raw response or notes from verification", + ) + _unique_partner_id_type = models.Constraint( "UNIQUE(partner_id, id_type_id)", "A registrant cannot have duplicate ID types", @@ -69,6 +103,13 @@ def _name_search(self, name, domain=None, operator="ilike", limit=100, order=Non domain = [("partner_id", operator, name)] + domain return self._search(domain, limit=limit, order=order) + @api.depends("verification_method") + def _compute_is_verified(self): + """Compute is_verified based on method - verbal/self_declared are not verified.""" + unverified_methods = {"verbal", "self_declared", False} + for record in self: + record.is_verified = record.verification_method not in unverified_methods + @api.constrains("value") @api.onchange("value") def _onchange_id_validation(self): diff --git a/spp_registry/views/reg_id_view.xml b/spp_registry/views/reg_id_view.xml index 896539cb..40a7902d 100644 --- a/spp_registry/views/reg_id_view.xml +++ b/spp_registry/views/reg_id_view.xml @@ -3,6 +3,48 @@ Part of OpenSPP. See LICENSE file for full copyright and licensing details. --> + + spp.registry.id.form + spp.registry.id + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+ view_reg_id_tree spp.registry.id @@ -19,6 +61,8 @@ /> + + @@ -51,7 +95,7 @@ Registrant IDs ir.actions.act_window spp.registry.id - list + list,form {} []