How to Build a Safe, Autonomous Prior Authorization Agent for Healthcare Revenue Cycle Management with Human-in-the-Loop Controls


def _now_iso() -> str:
   return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"


def _stable_id(prefix: str, seed: str) -> str:
   h = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:10]
   return f"{prefix}_{h}"


class MockEHR:
   def __init__(self):
       self.orders_queue: List[SurgeryOrder] = []
       self.patient_docs: Dict[str, List[ClinicalDocument]] = {}


   def seed_data(self, n_orders: int = 5):
       random.seed(7)


       def make_patient(i: int) -> Patient:
           pid = f"PT{i:04d}"
           plan = random.choice(list(InsurancePlan))
           return Patient(
               patient_id=pid,
               name=f"Patient {i}",
               dob="1980-01-01",
               member_id=f"M{i:08d}",
               plan=plan,
           )


       def docs_for_order(patient: Patient, surgery: SurgeryType) -> List[ClinicalDocument]:
           base = [
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient.patient_id + "H&P"),
                   doc_type=DocType.H_AND_P,
                   created_at=_now_iso(),
                   content="H&P: Relevant history, exam findings, and surgical indication.",
                   source="EHR",
               ),
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient.patient_id + "NOTE"),
                   doc_type=DocType.CLINICAL_NOTE,
                   created_at=_now_iso(),
                   content="Clinical note: Symptoms, conservative management attempted, clinician assessment.",
                   source="EHR",
               ),
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient.patient_id + "MEDS"),
                   doc_type=DocType.MED_LIST,
                   created_at=_now_iso(),
                   content="Medication list: Current meds, allergies, contraindications.",
                   source="EHR",
               ),
           ]


           maybe = []
           if surgery in [SurgeryType.KNEE_ARTHROPLASTY, SurgeryType.SPINE_FUSION, SurgeryType.BARIATRIC]:
               maybe.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", patient.patient_id + "LABS"),
                       doc_type=DocType.LABS,
                       created_at=_now_iso(),
                       content="Labs: CBC/CMP within last 30 days.",
                       source="LabSystem",
                   )
               )


           if surgery in [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:
               maybe.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", patient.patient_id + "IMG"),
                       doc_type=DocType.IMAGING,
                       created_at=_now_iso(),
                       content="Imaging: MRI/X-ray report supporting diagnosis and severity.",
                       source="Radiology",
                   )
               )


           final = base + [d for d in maybe if random.random() > 0.35]


           if random.random() > 0.6:
               final.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", patient.patient_id + "PRIOR_TX"),
                       doc_type=DocType.PRIOR_TX,
                       created_at=_now_iso(),
                       content="Prior treatments: PT, meds, injections tried over 6+ weeks.",
                       source="EHR",
                   )
               )


           if random.random() > 0.5:
               final.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", patient.patient_id + "CONSENT"),
                       doc_type=DocType.CONSENT,
                       created_at=_now_iso(),
                       content="Consent: Signed procedure consent and risk disclosure.",
                       source="EHR",
                   )
               )


           return final


       for i in range(1, n_orders + 1):
           patient = make_patient(i)
           surgery = random.choice(list(SurgeryType))
           order = SurgeryOrder(
               order_id=_stable_id("ORD", patient.patient_id + surgery.value),
               patient=patient,
               surgery_type=surgery,
               scheduled_date=(datetime.utcnow().date() + timedelta(days=random.randint(3, 21))).isoformat(),
               ordering_provider_npi=str(random.randint(1000000000, 1999999999)),
               diagnosis_codes=["M17.11", "M54.5"] if surgery != SurgeryType.CATARACT else ["H25.9"],
               created_at=_now_iso(),
           )
           self.orders_queue.append(order)
           self.patient_docs[patient.patient_id] = docs_for_order(patient, surgery)


   def poll_new_surgery_orders(self, max_n: int = 1) -> List[SurgeryOrder]:
       pulled = self.orders_queue[:max_n]
       self.orders_queue = self.orders_queue[max_n:]
       return pulled


   def get_patient_documents(self, patient_id: str) -> List[ClinicalDocument]:
       return list(self.patient_docs.get(patient_id, []))


   def fetch_additional_docs(self, patient_id: str, needed: List[DocType]) -> List[ClinicalDocument]:
       generated = []
       for dt in needed:
           generated.append(
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient_id + dt.value + str(time.time())),
                   doc_type=dt,
                   created_at=_now_iso(),
                   content=f"Auto-collected document for {dt.value}: extracted and formatted per payer policy.",
                   source="AutoCollector",
               )
           )
       self.patient_docs.setdefault(patient_id, []).extend(generated)
       return generated


class MockPayerPortal:
   def __init__(self):
       self.db: Dict[str, Dict[str, Any]] = {}
       random.seed(11)


   def required_docs_policy(self, plan: InsurancePlan, surgery: SurgeryType) -> List[DocType]:
       base = [DocType.H_AND_P, DocType.CLINICAL_NOTE, DocType.MED_LIST]
       if surgery in [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:
           base += [DocType.IMAGING, DocType.LABS, DocType.PRIOR_TX]
       if surgery == SurgeryType.BARIATRIC:
           base += [DocType.LABS, DocType.PRIOR_TX]
       if plan in [InsurancePlan.PAYER_BETA, InsurancePlan.PAYER_GAMMA]:
           base += [DocType.CONSENT]
       return sorted(list(set(base)), key=lambda x: x.value)


   def submit(self, pa: PriorAuthRequest) -> PayerResponse:
       payer_ref = _stable_id("PAYREF", pa.request_id + _now_iso())
       docs_present = {d.doc_type for d in pa.docs_attached}
       required = self.required_docs_policy(pa.order.patient.plan, pa.order.surgery_type)
       missing = [d for d in required if d not in docs_present]


       self.db[payer_ref] = {
           "status": AuthStatus.SUBMITTED,
           "order_id": pa.order.order_id,
           "plan": pa.order.patient.plan,
           "surgery": pa.order.surgery_type,
           "missing": missing,
           "polls": 0,
           "submitted_at": _now_iso(),
           "denial_reason": None,
       }


       msg = "Submission received. Case queued for review."
       if missing:
           msg += " Initial validation indicates incomplete documentation."
       return PayerResponse(status=AuthStatus.SUBMITTED, payer_ref=payer_ref, message=msg)


   def check_status(self, payer_ref: str) -> PayerResponse:
       if payer_ref not in self.db:
           return PayerResponse(
               status=AuthStatus.DENIED,
               payer_ref=payer_ref,
               message="Case not found (possible payer system error).",
               denial_reason=DenialReason.OTHER,
               confidence=0.4,
           )


       case = self.db[payer_ref]
       case["polls"] += 1


       if case["status"] == AuthStatus.SUBMITTED and case["polls"] >= 1:
           case["status"] = AuthStatus.IN_REVIEW


       if case["status"] == AuthStatus.IN_REVIEW and case["polls"] >= 3:
           if case["missing"]:
               case["status"] = AuthStatus.DENIED
               case["denial_reason"] = DenialReason.MISSING_DOCS
           else:
               roll = random.random()
               if roll < 0.10:
                   case["status"] = AuthStatus.DENIED
                   case["denial_reason"] = DenialReason.CODING_ISSUE
               elif roll < 0.18:
                   case["status"] = AuthStatus.DENIED
                   case["denial_reason"] = DenialReason.MEDICAL_NECESSITY
               else:
                   case["status"] = AuthStatus.APPROVED


       if case["status"] == AuthStatus.DENIED:
           dr = case["denial_reason"] or DenialReason.OTHER
           missing = case["missing"] if dr == DenialReason.MISSING_DOCS else []
           conf = 0.9 if dr != DenialReason.OTHER else 0.55
           return PayerResponse(
               status=AuthStatus.DENIED,
               payer_ref=payer_ref,
               message=f"Denied. Reason={dr.value}.",
               denial_reason=dr,
               missing_docs=missing,
               confidence=conf,
           )


       if case["status"] == AuthStatus.APPROVED:
           return PayerResponse(
               status=AuthStatus.APPROVED,
               payer_ref=payer_ref,
               message="Approved. Authorization issued.",
               confidence=0.95,
           )


       return PayerResponse(
           status=case["status"],
           payer_ref=payer_ref,
           message=f"Status={case['status'].value}. Polls={case['polls']}.",
           confidence=0.9,
       )


   def file_appeal(self, payer_ref: str, appeal_text: str, attached_docs: List[ClinicalDocument]) -> PayerResponse:
       if payer_ref not in self.db:
           return PayerResponse(
               status=AuthStatus.DENIED,
               payer_ref=payer_ref,
               message="Appeal failed: case not found.",
               denial_reason=DenialReason.OTHER,
               confidence=0.4,
           )


       case = self.db[payer_ref]
       docs_present = {d.doc_type for d in attached_docs}
       still_missing = [d for d in case["missing"] if d not in docs_present]
       case["missing"] = still_missing
       case["status"] = AuthStatus.APPEALED
       case["polls"] = 0


       msg = "Appeal submitted and queued for review."
       if still_missing:
           msg += f" Warning: still missing {', '.join([d.value for d in still_missing])}."
       return PayerResponse(status=AuthStatus.APPEALED, payer_ref=payer_ref, message=msg, confidence=0.9)



Source link

  • Related Posts

    Google AI Introduces Gemini Embedding 2: A Multimodal Embedding Model that Lets Your Bring Text, Images, Video, Audio, and Docs into the Embedding Space

    Google expanded its Gemini model family with the release of Gemini Embedding 2. This second-generation model succeeds the text-only gemini-embedding-001 and is designed specifically to address the high-dimensional storage and…

    Fish Audio Releases Fish Audio S2: A New Generation of Expressive Text-to-Speech (TTS) with Absurdly Controllable Emotion

    The landscape of Text-to-Speech (TTS) is moving away from modular pipelines toward integrated Large Audio Models (LAMs). Fish Audio’s release of S2-Pro, the flagship model within the Fish Speech ecosystem,…

    Leave a Reply

    Your email address will not be published. Required fields are marked *