From adb7d11293815693aadc7b5bb6c1f5cbbd2207ba Mon Sep 17 00:00:00 2001 From: Robbert Krebbers <mail@robbertkrebbers.nl> Date: Thu, 4 Jul 2019 13:58:18 +0200 Subject: [PATCH] Distributed mapper. --- theories/examples/mapper.v | 204 ++++++++++++++++++++++++++++++++++ theories/utils/contribution.v | 40 ++++--- 2 files changed, 229 insertions(+), 15 deletions(-) create mode 100644 theories/examples/mapper.v diff --git a/theories/examples/mapper.v b/theories/examples/mapper.v new file mode 100644 index 0000000..1f13162 --- /dev/null +++ b/theories/examples/mapper.v @@ -0,0 +1,204 @@ +From stdpp Require Import sorting. +From osiris.channel Require Import proto_channel proofmode. +From iris.heap_lang Require Import proofmode notation. +From iris.heap_lang Require Import assert. +From osiris.utils Require Import list compare spin_lock contribution. +From osiris.examples Require Import list_sort_elem. +From iris.algebra Require Import gmultiset. + +Definition mapper : val := + rec: "go" "f" "l" "c" := + acquire "l";; + send "c" #true;; + if: ~recv "c" then release "l" else + let: "x" := recv "c" in + release "l";; + let: "y" := "f" "x" in + acquire "l";; + send "c" #false;; + send "c" "y";; + release "l";; + "go" "f" "l" "c". + +Definition start_mappers : val := + rec: "go" "n" "f" "l" "c" := + if: "n" = #0 then #() else + Fork (mapper "f" "l" "c");; + "go" ("n" - #1) "f" "l" "c". + +Definition loop_mappers : val := + rec: "go" "n" "c" "xs" "ys" := + if: "n" = #0 then "ys" else + if: recv "c" then + if: lisnil "xs" then + send "c" #false;; + "go" ("n" - #1) "c" "xs" "ys" + else + send "c" #true;; + send "c" (lhead "xs");; + "go" "n" "c" (ltail "xs") "ys" + else + let: "y" := recv "c" in + "go" "n" "c" "xs" (lcons "y" "ys"). + +Definition mapper_service : val := λ: "n" "f" "xs", + let: "l" := new_lock #() in + let: "c" := start_chan (λ: "c", + start_mappers "n" "f" "l" "c") in + loop_mappers "n" "c" "xs" (lnil #()). + +Class map_sortG Σ A `{Countable A} := { + map_sort_contributionG :> contributionG Σ (gmultisetUR A); + map_sort_lockG :> lockG Σ; +}. + +Section mapper. + Context `{Countable A, Countable B}. + Context `{!heapG Σ, !proto_chanG Σ, map_sortG Σ A} (N : namespace). + Context (IA : A → val → iProp Σ) (IB : B → val → iProp Σ) (f : A → B). + Local Open Scope nat_scope. + + Definition mapper_protocol_aux (rec : nat -d> gmultiset A -d> iProto Σ) : + nat -d> gmultiset A -d> iProto Σ := λ i X, + let rec : nat → gmultiset A → iProto Σ := rec in + (if i is 0 then END else + ((<!> x v, MSG v {{ IA x v }}; rec i (X ⊎ {[ x ]})) + <+> + rec (pred i) X + ) <{⌜ i ≠1 ∨ X = ∅ âŒ}&> + <?> x w, MSG w {{ ⌜ x ∈ X ⌠∧ IB (f x) w }}; rec i (X ∖ {[ x ]}))%proto. + Instance mapper_protocol_aux_contractive : Contractive mapper_protocol_aux. + Proof. solve_proper_prepare. f_equiv. solve_proto_contractive. Qed. + Definition mapper_protocol := fixpoint mapper_protocol_aux. + Global Instance mapper_protocol_unfold n X : + ProtoUnfold (mapper_protocol n X) (mapper_protocol_aux mapper_protocol n X). + Proof. apply proto_unfold_eq, (fixpoint_unfold mapper_protocol_aux). Qed. + + Definition mapper_lock_inv (γ : gname) (c : val) : iProp Σ := + (∃ i X, server γ i X ∗ c ↣ iProto_dual (mapper_protocol i X) @ N)%I. + + Lemma mapper_spec γ (ff : val) lk c q : + (∀ x v, {{{ IA x v }}} ff v {{{ w, RET w; IB (f x) w }}}) -∗ + {{{ is_lock N lk (mapper_lock_inv γ c) ∗ + unlocked N lk q ∗ client γ (∅ : gmultiset A) }}} + mapper ff #lk c + {{{ RET #(); True }}}. + Proof. + iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγ) HΦ". iLöb as "IH". + wp_rec; wp_pures. + wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]". + iDestruct "H" as (i X) "[Hs Hc]". + iDestruct (@server_agree with "Hs Hγ") as %[??]; destruct i as [|i]=>//=. + iAssert ⌜ S i ≠1 ∨ X = ∅ âŒ%I as %?. + { destruct i as [|i]; last auto with lia. + iDestruct (@server_1_agree with "Hs Hγ") as %?%leibniz_equiv; auto. } + wp_select. wp_branch; wp_pures; last first. + { iMod (@dealloc_client with "Hs Hγ") as "Hs /=". + wp_apply (release_spec with "[$Hlk $Hl Hc Hs]"). + { iExists i, _. iFrame. } + iIntros "_". by iApply "HΦ". } + wp_recv (x v) as "HI". + iMod (@update_client with "Hs Hγ") as "[Hs Hγ]". + { apply (gmultiset_local_update_alloc _ _ {[ x ]}). } + rewrite left_id_L. + wp_apply (release_spec with "[$Hlk $Hl Hc Hs]"). + { iExists (S i), _. iFrame. } + clear dependent i X. iIntros "Hu". wp_apply ("Hf" with "HI"); iIntros (w) "HI". + wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]". + iDestruct "H" as (i X) "[Hs Hc]". + iDestruct (@server_agree with "Hs Hγ") + as %[??%gmultiset_included]; destruct i as [|i]=>//=. + wp_select. wp_send with "[$HI]". + { by rewrite gmultiset_elem_of_singleton_subseteq. } + iMod (@update_client with "Hs Hγ") as "[Hs Hγ]". + { by apply (gmultiset_local_update_dealloc _ _ {[ x ]}). } + rewrite gmultiset_difference_diag. + wp_apply (release_spec with "[$Hlk $Hl Hc Hs]"). + { iExists (S i), _. iFrame. } + iIntros "Hu". by wp_apply ("IH" with "[$] [$]"). + Qed. + + Lemma start_mappers_spec γ (n : nat) (ff : val) lk c q : + (∀ x v, {{{ IA x v }}} ff v {{{ w, RET w; IB (f x) w }}}) -∗ + {{{ is_lock N lk (mapper_lock_inv γ c) ∗ unlocked N lk q ∗ + n * client γ (∅:gmultiset A) }}} + start_mappers #n ff #lk c + {{{ RET #(); True }}}. + Proof. + iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγs) HΦ". + iInduction n as [|n] "IH" forall (q); wp_rec; wp_pures; simpl. + { by iApply "HΦ". } + iDestruct "Hγs" as "[Hγ Hγs]"; iDestruct "Hu" as "[Hu Hu']". + wp_apply (wp_fork with "[Hγ Hu]"). + { iNext. wp_apply (mapper_spec with "Hf [$]"); auto. } + wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ. + wp_apply ("IH" with "[$] [$] [$]"). + Qed. + + Lemma loop_mappers_spec (n : nat) c vs xs ws X_send xs_recv : + (n = 0 → X_send = ∅ ∧ xs = []) → + {{{ + c ↣ mapper_protocol n (X_send : gmultiset A) @ N ∗ + ([∗ list] x;v ∈ xs;vs, IA x v) ∗ ([∗ list] x;w ∈ xs_recv;ws, IB (f x) w) + }}} + loop_mappers #n c (val_encode vs) (val_encode ws) + {{{ ys ws, RET (val_encode ws); + ⌜ys ++ (f <$> xs_recv) ≡ₚ f <$> (elements X_send ++ xs ++ xs_recv)⌠∗ + [∗ list] y;w ∈ ys ++ (f <$> xs_recv);ws, IB y w + }}}. + Proof. + iIntros (Hn Φ) "(Hc & Hxs & Hxs_recv) HΦ". + iLöb as "IH" forall (n vs xs ws X_send xs_recv Hn Φ); wp_rec; wp_pures; simpl. + case_bool_decide; wp_pures; simplify_eq/=. + { destruct Hn as [-> ->]; first lia. + iApply ("HΦ" $! []); simpl. rewrite big_sepL2_fmap_l. auto. } + destruct n as [|n]=> //=. wp_branch as %?|%_; wp_pures. + - wp_apply (lisnil_spec (A:=val) with "[//]"); iIntros (_). + destruct vs as [|v vs], xs as [|x xs]; csimpl; try done; wp_pures. + + wp_select. wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ. + iApply ("IH" with "[] Hc [//] [$] HΦ"). iPureIntro; naive_solver. + + iDestruct "Hxs" as "[Hx Hxs]". wp_select. + wp_apply (lhead_spec (A:=val) with "[//]"); iIntros (_). + wp_send with "[$Hx]". + wp_apply (ltail_spec (A:=val) with "[//]"); iIntros (_). + wp_apply ("IH" with "[] Hc Hxs Hxs_recv"); first done. + iIntros (ys ws'). + rewrite gmultiset_elements_disj_union gmultiset_elements_singleton -!assoc_L. + iApply "HΦ". + - wp_recv (x w) as (HH) "HIfx". + wp_apply (lcons_spec (A:=val) with "[//]"); iIntros (_). + wp_apply ("IH" $! _ _ _ _ _ (_ :: _) with "[] Hc Hxs [HIfx Hxs_recv]"); first done. + { simpl; iFrame. } + iIntros (ys ws'); iDestruct 1 as (Hys) "H"; simplify_eq/=. + iApply ("HΦ" $! (ys ++ [f x])). iSplit. + + iPureIntro. rewrite -assoc_L /= Hys. + rewrite {2}(gmultiset_disj_union_difference {[ x ]} X_send) + -?gmultiset_elem_of_singleton_subseteq //. + rewrite (comm disj_union) gmultiset_elements_disj_union. + rewrite gmultiset_elements_singleton. + by rewrite -assoc_L (assoc_L _ [x]) (comm _ [x]) -!assoc_L. + + by rewrite -assoc_L. + Qed. + + Lemma mapper_service_spec (n : nat) (ff : val) vs xs : + 0 < n → + (∀ x v, {{{ IA x v }}} ff v {{{ w, RET w; IB (f x) w }}}) -∗ + {{{ [∗ list] x;v ∈ xs;vs, IA x v }}} + mapper_service #n ff (val_encode vs) + {{{ ys ws, RET (val_encode ws); ⌜ys ≡ₚ f <$> xs⌠∗ + [∗ list] y;w ∈ ys;ws, IB y w }}}. + Proof. + iIntros (?) "#Hf !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures. + wp_apply (new_lock_spec N with "[//]"); iIntros (lk) "[Hu Hlk]". + wp_apply (start_chan_proto_spec N (mapper_protocol n ∅) with "[Hu Hlk]"); + try iNext; iIntros (c) "Hc". + { wp_lam. + iMod (contribution_initN (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]". + iMod ("Hlk" $! (mapper_lock_inv γ c) with "[Hc Hs]") as "#Hlk". + { iExists n, ∅. iFrame. } + wp_apply (start_mappers_spec with "Hf [$Hlk $Hu $Hγs]"); auto. } + wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_). + wp_apply (loop_mappers_spec _ _ _ _ [] ∅ [] with "[$Hc $HI //]"); first lia. + iIntros (ys ws). rewrite /= !right_id_L gmultiset_elements_empty. iApply "HΦ". + Qed. +End mapper. diff --git a/theories/utils/contribution.v b/theories/utils/contribution.v index 2777837..020ae78 100644 --- a/theories/utils/contribution.v +++ b/theories/utils/contribution.v @@ -1,6 +1,7 @@ From iris.base_logic Require Export base_logic lib.iprop lib.own. From iris.proofmode Require Export tactics. From iris.algebra Require Import excl auth csum gmultiset frac_auth. +From iris.algebra Require Export local_updates. Class contributionG Σ (A : ucmraT) `{!CmraDiscrete A} := { contribution_inG :> inG Σ @@ -19,6 +20,12 @@ Definition client `{contributionG Σ A} (γ : gname) (x : A) : iProp Σ := Typeclasses Opaque client. Instance: Params (@client) 5. +(** MOVE *) +Fixpoint bi_mult {PROP : bi} (n : nat) (P : PROP) : PROP := + match n with O => emp | S n => P ∗ bi_mult n P end%I. +Arguments bi_mult {_} _ _%I. +Notation "n * P" := (bi_mult n P) : bi_scope. + Section contribution. Context `{contributionG Σ A}. Implicit Types x y : A. @@ -77,11 +84,10 @@ Section contribution. by destruct n. Qed. - Lemma alloc_client γ n x x' y' : - (x,ε) ~l~> (x',y') → - server γ n x ==∗ server γ (S n) x' ∗ client γ y'. + Lemma alloc_client γ n x : + server γ n x ==∗ server γ (S n) x ∗ client γ ε. Proof. - intros Hup. rewrite /server /client. + rewrite /server /client. destruct (decide (n = 0)) as [->|?]; case_decide; try done. - iDestruct 1 as (Hx) "[Hs Hc]"; setoid_subst. iMod (own_update_2 with "Hs Hc") as "[$ $]"; last done. @@ -99,28 +105,24 @@ Section contribution. by apply option_local_update, csum_local_update_l, prod_local_update_2. Qed. - Lemma dealloc_client γ n x y : - Cancelable y → - server γ n (x â‹… y) -∗ client γ y ==∗ server γ (pred n) x. + Lemma dealloc_client γ n x : + server γ n x -∗ client γ ε ==∗ server γ (pred n) x. Proof. - iIntros (?) "Hs Hc". iDestruct (server_valid with "Hs") as %Hv. + iIntros "Hs Hc". iDestruct (server_valid with "Hs") as %Hv. destruct (decide (n = 1)) as [->|]; simpl. - - iDestruct (server_1_agree with "Hs Hc") as %Hxy. - move: Hxy. rewrite {1}(comm _ x) -{2}(right_id ε _ y)=> /cancelable. - rewrite {1}comm=> /(_ Hv) ->. rewrite left_id. + - iDestruct (server_1_agree with "Hs Hc") as %->. rewrite /server /client; repeat case_decide=> //. iMod (own_update_2 with "Hs Hc") as "[$ $]"; last done. by apply auth_update, option_local_update, (replace_local_update _ _). - - iDestruct (server_agree with "Hs Hc") as %[? [z Hxy]]. - move: Hxy. rewrite (comm _ x)=> /cancelable. - rewrite {1}comm=> /(_ Hv) ->. + - iDestruct (server_agree with "Hs Hc") as %[? [z ->]]. rewrite /server /client. destruct n as [|[|n]]; case_decide=>//=. iApply (own_update_2 with "Hs Hc"). apply auth_update_dealloc. rewrite -(right_id _ _ (Some (Cinl (1%positive, _)))). rewrite Nat2Pos.inj_succ // -Pos.add_1_l. - rewrite -pair_op -Cinl_op Some_op. apply (cancel_local_update _ _ _). + rewrite -pair_op -Cinl_op Some_op left_id. apply (cancel_local_update _ _ _). Qed. + Lemma update_client γ n x y x' y' : (x,y) ~l~> (x',y') → server γ n x -∗ client γ y ==∗ server γ n x' ∗ client γ y'. @@ -137,4 +139,12 @@ Section contribution. by apply auth_update, option_local_update, csum_local_update_l, prod_local_update_2. Qed. + + (** Derived *) + Lemma contribution_initN n : (|==> ∃ γ, server γ n ε ∗ n * client γ ε)%I. + Proof. + iMod (contribution_init) as (γ) "Hs". iExists γ. + iInduction n as [|n] "IH"; first by iFrame. + iMod ("IH" with "Hs") as "[Hs $]". by iApply alloc_client. + Qed. End contribution. -- GitLab