Skip to content
Snippets Groups Projects

Prove queue SPMC message passing

Merged Jaehwang Jung requested to merge jaehwang/spmc-empty into graphs_multi
All threads resolved!
4 files
+ 302
12
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 288
0
From iris.algebra Require Import auth numbers.
From iris.proofmode Require Import tactics.
From gpfsl.lang Require Import notation.
From gpfsl.lang Require Import notation.
From gpfsl.logic Require Import logatom invariants proofmode.
From gpfsl.logic Require Import new_delete repeat_loop.
From gpfsl.gps Require Import surface_iSP protocols escrows.
From gpfsl.examples.queue Require Import spec_graph.
From gpfsl.examples Require Import set_helper.
Require Import iris.prelude.options.
Local Open Scope Z_scope.
Local Notation graph := (graph qevent).
Implicit Types (G : graph).
Section spmc_mp.
Context `{!noprolG Σ,
!gpsG Σ boolProtocol,
!inG Σ (graphR qevent),
!inG Σ (authR natUR)}.
Context (mpN qN : namespace)
(DISJ1: mpN ## histN) (DISJ2: qN ## histN) (DISJ3: mpN ## qN).
Definition mpinvN := mpN .@ "inv".
Definition flagN := mpN .@ "flag".
Hypothesis Q : weak_queue_spec Σ.
Local Existing Instances QueueInv_Objective QueueLocal_Persistent.
Local Notation iProp := (iProp Σ).
Local Notation vProp := (vProp Σ).
Definition spmc_mp : expr :=
let: "q" := Q.(new_queue) [] in
let: "flag" := new [ #1] in
"flag" <- #0 ;;
Fork (**) (**) (Q.(enqueue) ["q"; #1] ;;
(**) (**) Q.(enqueue) ["q"; #2] ;;
(**) (**) "flag" <-ʳᵉˡ #1 ) ;;
(**) (**)
Fork (**) (Q.(dequeue) ["q"]);; (**)
(**) (**)
(repeat: !ᵃᶜ "flag") ;; (**) (**)
Q.(dequeue) ["q"] (**) (**)
.
(** Invariant on the queue state:
- A successful dequeue consumes a token [◯ 1].
- There are only 2 such tokens.
- Only the values 1 or 2 are enqueued. *)
Definition MPInv γg γm q : vProp := G,
Q.(QueueInv) γg q G
own γm ( 2%nat)
own γm ( (size G.(com)))
e eV v, G.(Es) !! e = Some eV eV.(ge_event) = Enq v v = 1 v = 2
.
(** The flag location transfers the knowledge about the enqueue events. *)
Definition FlagProtocol γg q s (v : val) : vProp :=
match s with
| false => v = #0
| true => v = #1 G M e1 e2 eV1 eV2,
Q.(QueueLocal) qN γg q G M
e1 e2
G.(Es) !! e1 = Some eV1 G.(Es) !! e2 = Some eV2
eV1.(ge_event) = Enq 1 eV2.(ge_event) = Enq 2
e1 M e2 M
end.
Definition FlagInterp γg q : interpO Σ boolProtocol :=
λ _ _ _ _, FlagProtocol γg q.
Global Instance FlagProtocol_persistent γg q s v :
Persistent (FlagProtocol γg q s v).
Proof. rewrite /Persistent. destruct s; by iIntros "#FP". Qed.
Global Instance FlagInterp_persistent γg q s l γl t b v V :
Persistent (FlagInterp γg q s l γl t b v V).
Proof. destruct b; apply _. Qed.
Lemma spmc_mp_spec tid :
{{{ True }}} spmc_mp @ tid; {{{ v, RET #v; v = 1 v = 2 }}}.
Proof using All.
iIntros (Φ) "_ Post".
rewrite /spmc_mp.
wp_apply (new_queue_spec _ qN with "[//]").
iIntros (γg q) "[#QL QI]".
iMod (own_alloc (( 2%nat 0%nat) ( 1%nat 1%nat))) as (γm) "[●m [◯m1 ◯m2]]".
{ rewrite -assoc -!auth_frag_op !nat_op /=. by apply auth_both_valid. }
wp_let.
wp_apply wp_new; [done..|].
iIntros (f) "(_ & f↦ & _)". rewrite own_loc_na_vec_singleton.
wp_let. wp_write.
(* setup the invariant and the protocol *)
iMod (GPS_iSP_Init flagN (FlagInterp γg q) (FlagInterp γg q false) f false with "f↦ [//]")
as (γf ?) "FPW".
iDestruct (GPS_iSP_SWWriter_Reader with "FPW") as "#FPR".
iMod (inv_alloc mpinvN _ (MPInv γg γm q) with "[QI ●m]") as "#I".
{ iNext. iExists ∅. iFrame "QI". iDestruct "●m" as "[$ $]". done. }
(* enqueuing thread *)
wp_apply (wp_fork with "[FPW]"); [done|..].
{ iIntros "!>" (?).
(* enqueue 1 *)
iDestruct (monPred_in_intro True%I with "[//]") as (V) "[#⊒V _]".
awp_apply (enqueue_spec with "⊒V QL"); [solve_ndisj|done|].
iInv mpinvN as (G1) "(QI & >●m & >◯m & >%Hv)".
iAaccIntro with "QI".
{ iFrame. iIntros "QI !> !>". iExists _. iFrame. done. }
iIntros (V1 G1' enqId1 enq1 Venq1 M1') "(QI & #⊒V1 & #QL1 & %F) !>".
set (eV1 := mkGraphEvent enq1 Venq1 M1') in *.
rewrite /is_enqueue in F.
destruct F as (SubG11' &_&_&_& -> & ? & EsG1' &_& ComG1' & InM1' &_).
have HeV1 : G1'.(Es) !! enqId1 = Some eV1.
{ subst enqId1. rewrite EsG1'. apply lookup_app_1_eq. }
iDestruct (view_at_elim with "⊒V1 QL1") as "{QL QL1} QL1".
iSplitR "FPW".
{ iNext. iExists G1'. rewrite -ComG1'. iFrame. iPureIntro.
rewrite EsG1'. intros ??? [?|[-> <-]]%lookup_app_1; [by eapply Hv|].
injection 1 as <-. by left. }
clear Hv. iIntros "_". wp_seq.
(* enqueue 2 *)
awp_apply (enqueue_spec with "⊒V QL1"); [solve_ndisj|done|].
iInv mpinvN as (G2) "(QI & >●m & >◯m & >%Hv)".
rewrite QueueInv_graph_master_acc. iDestruct "QI" as "[>Gm QI]".
iDestruct (graph_master_consistent with "Gm") as %EGC.
iPoseProof (QueueLocal_graph_snap with "QL1") as "Gs".
iDestruct (graph_master_snap_included with "Gm Gs") as %SubG1'2.
iSpecialize ("QI" with "[$Gm]").
iAaccIntro with "QI".
{ iFrame. iIntros "QI !> !>". iExists _. iFrame. done. }
iIntros (V2 G2' enqId2 enq2 Venq2 M2') "(QI & #⊒V2 & #QL2 & %F) !>".
set (eV2 := mkGraphEvent enq2 Venq2 M2') in *.
rewrite /is_enqueue in F.
destruct F as (SubG22' & SubM1'2' &_&_& -> & ? & EsG2' &_& ComG2' & InM2' &_).
have HeV2 : G2'.(Es) !! enqId2 = Some eV2.
{ subst enqId2. rewrite EsG2'. apply lookup_app_1_eq. }
iDestruct (view_at_elim with "⊒V2 QL2") as "{QL2} QL2".
iSplitR "FPW".
{ iNext. iExists G2'. rewrite -ComG2'. iFrame. iPureIntro.
rewrite EsG2'. intros ??? [?|[-> <-]]%lookup_app_1; [by eapply Hv|].
injection 1 as <-. by right. }
iIntros "_". wp_seq.
(* set the flag *)
iApply (GPS_iSP_SWWrite flagN (FlagInterp γg q) (FlagInterp γg q false)
True%I _ AcqRel _ _ true _ #1 with "[$FPW]");
[solve_ndisj|done|done|by right|done|..].
{ simpl. iSplitL; [|done].
iIntros "!> * % !>". iSplitR; [done|].
iExists G2', M2', enqId1, enqId2, eV1, eV2. iFrame "QL2".
iPureIntro. split_and!; [| |done|done|done| |done].
- subst enqId1 enqId2.
have ? : (length (Es G1) < length (Es G1')).
{ rewrite EsG1' app_length /=. lia. }
suff ? : (length (Es G1') length (Es G2))%nat by lia.
apply prefix_length. by destruct SubG1'2.
- eapply prefix_lookup; [done|].
destruct SubG1'2, SubG22'. by trans G2.(Es).
- set_solver +InM1' SubM1'2'. }
by iIntros "!> * _". }
iIntros "_". wp_seq.
(* dequeuing thread *)
wp_apply (wp_fork with "[◯m1]"); [done|..].
{ iIntros "!>" (?).
iDestruct (monPred_in_intro True%I with "[//]") as (V) "[#⊒V _]".
awp_apply (dequeue_spec with "⊒V QL"); [solve_ndisj|].
iInv mpinvN as (G) "(QI & >●m & >◯m & >%Hv)".
iDestruct (QueueInv_QueueConsistent with "QI") as "#>%QC".
iAaccIntro with "QI".
{ iFrame. iIntros "QI !> !>". iExists _. iFrame. done. }
iIntros (v V' G' enqId deqId enq deq Vdeq M') "(QI & #⊒V' & #QL' & %F) !>".
iSplitL; last by iIntros. iNext. iExists G'.
set (dV := mkGraphEvent deq Vdeq M') in *.
destruct F as (SubG' & SubM' & HVin & HVcomm & [CASE|CASE]).
- destruct CASE as (_& -> &_& EsG' &_& ComG' &_).
rewrite ComG'. iFrame. iPureIntro.
rewrite EsG'. intros ??? [?|[-> <-]]%lookup_app_1; [by eapply Hv|done].
- destruct CASE as (_& -> & -> &_& NoSo & EsG' &_& ComG' & _).
iFrame. iCombine "◯m ◯m1" as "◯m".
rewrite ComG' size_union.
+ rewrite size_singleton (comm plus). iFrame. iPureIntro.
rewrite EsG'. intros ??? [?|[-> <-]]%lookup_app_1; [by eapply Hv|done].
+ apply disjoint_singleton_l. rewrite -(bsq_cons_so_com _ QC).
by apply NoSo. }
iIntros "_". wp_seq.
(* main thread *)
iClear "QL".
(* wait for the flag to be set *)
wp_bind (repeat: _)%E.
iLöb as "IH".
iApply wp_repeat; [done|].
iApply (GPS_iSP_Read flagN (FlagInterp γg q) (FlagInterp γg q false) (FlagInterp γg q false f γf)
with "[$FPR]"); [solve_ndisj|done|done|by right|..].
{ iIntros "!>" (????) "!>". iSplit; last iSplit; by iIntros "#$". }
iIntros "!>" (? s' v'). simpl.
iDestruct 1 as "[% [FPR' FI]]". destruct s'; simpl; last first.
{ (* keep looping *)
iDestruct "FI" as %->.
iExists 0. iSplit; [done|]. simpl.
by iApply ("IH" with "Post ◯m2"). }
(* done looping *)
iClear "IH FPR FPR'".
iDestruct "FI" as "[-> QL]".
iExists 1. iSplit; [done|]. simpl. iIntros "!> !>". wp_seq.
iDestruct "QL" as (G0 M0 ????) "(#QL & %NEe12 & %HeV1 & %HeV2 & %Henq1 & %Henq2 & %HM1 & %HM2)".
iDestruct (monPred_in_intro True%I with "[//]") as (V) "[#⊒V _]".
iApply (wp_step_fupd _ _ _ _ ( _, _ -∗ _)%I with "[$Post]"); [auto..|].
awp_apply (dequeue_spec with "⊒V QL"); [solve_ndisj|].
iInv mpinvN as (G) "(QI & >●m & >◯m & >%Hv)".
iDestruct (QueueInv_QueueConsistent with "QI") as "#>%QC".
rewrite QueueInv_graph_master_acc. iDestruct "QI" as "[>Gm QI]".
iDestruct (graph_master_consistent with "Gm") as %EGC.
iPoseProof (QueueLocal_graph_snap with "QL") as "Gs".
iDestruct (graph_master_snap_included with "Gm Gs") as %SubG0G.
iSpecialize ("QI" with "[$Gm]").
apply (prefix_lookup _ G.(Es)) in HeV1; [|by destruct SubG0G].
apply (prefix_lookup _ G.(Es)) in HeV2; [|by destruct SubG0G].
iCombine "◯m ◯m2" as "◯m".
iDestruct (own_valid_2 with "●m ◯m") as %[LT%nat_included _]%auth_both_valid_discrete.
have {}LT : (size (com G) 1)%nat by lia.
iAaccIntro with "QI".
{ iDestruct "◯m" as "[◯m $]". iFrame. iIntros "QI !> !>". iExists _. iFrame. done. }
iIntros (v V' G' enqId deqId enq deq Vdeq M') "(QI & #⊒V' & #QL' & %F)".
set (dV := mkGraphEvent deq Vdeq M') in *.
iDestruct (QueueInv_QueueConsistent with "QI") as "#>%QC'".
destruct F as (SubG' & SubM' & HVin & HVcomm & [CASE|CASE]).
- (* empty pop *) exfalso.
destruct CASE as (-> & -> & ? & EsG' & _ & ComG' & ?).
have HdV : G'.(Es) !! deqId = Some dV.
{ subst deqId. rewrite EsG'. apply lookup_app_1_eq. }
case (decide (unmatched_enq_2 G e1)) => [[_ UNMATCHED1]|MATCHED1].
+ (* Empty dequeue couldn't have seen an unmatched enqueue. Contradiction. *)
refine (_ (bsq_cons_non_empty _ QC' _ _ HdV ltac:(done) e1 _ eV1 _ ltac:(done) _)); cycle 1.
{ rewrite EsG'. eapply prefix_lookup; [done|by eexists]. }
{ set_solver +SubM' HM1. }
intros (d' & Com_ed' & _). rewrite ComG' -(bsq_cons_so_com _ QC) in Com_ed'.
apply (UNMATCHED1 d'); [|done].
specialize (egcons_so _ EGC _ _ Com_ed') as (_ & dV' & _ & HdV' & _).
apply lookup_lt_Some in HdV'.
rewrite /to_set. apply elem_of_set_seq. lia.
+ (* Since e1 is matched, e2 couldn't have been matched. *)
have UNMATCHED2 : set_Forall (λ d2 : nat, (e2, d2) so G) (to_set (Es G)).
{ intros d2 In2 So2. apply MATCHED1. split.
{ exists 1. apply (f_equal (fmap ge_event)) in HeV1.
move: HeV1. rewrite /= Henq1 //. }
intros d1 In1 So1.
rewrite -(bsq_cons_so_com _ QC) in LT.
have INCL : {[(e1,d1); (e2,d2)]} G.(so) by set_solver +So1 So2.
move: INCL => /(subseteq_size _ _).
rewrite size_union; [|set_solver +NEe12].
rewrite !size_singleton. lia. }
refine (_ (bsq_cons_non_empty _ QC' _ _ HdV ltac:(done) e2 _ eV2 _ ltac:(done) _)); cycle 1.
{ rewrite EsG'. eapply prefix_lookup; [done|by eexists]. }
{ set_solver +SubM' HM2. }
intros (d' & Com_ed' & _). rewrite ComG' -(bsq_cons_so_com _ QC) in Com_ed'.
apply (UNMATCHED2 d'); [|done].
specialize (egcons_so _ EGC _ _ Com_ed') as (_ & dV' & _ & HdV' & _).
apply lookup_lt_Some in HdV'.
rewrite /to_set. apply elem_of_set_seq. lia.
- (* successful dequeue *)
rewrite /is_enqueue /is_dequeue in CASE.
destruct CASE as (?& -> & -> & ? & NoSo & EsG' & _ & ComG' &_&_&_& eV & HeV & ? &_).
iIntros "!>". iSplitL.
{ iNext. iExists _. iFrame.
rewrite ComG' size_union.
- rewrite size_singleton (comm plus). iFrame. iPureIntro.
rewrite EsG'. intros ??? [?|[-> <-]]%lookup_app_1; [by eapply Hv|done].
- apply disjoint_singleton_l. rewrite -(bsq_cons_so_com _ QC).
by apply NoSo. }
iIntros "_ Post". iApply "Post". iPureIntro. by apply (Hv _ _ _ HeV).
Qed.
End spmc_mp.
Loading