diff --git a/_CoqProject b/_CoqProject index 5cfc8818a8b4f5726b7c10d1c852e933437db3d8..b79a001159cb23c2735dff82d18ebe401d9d934c 100644 --- a/_CoqProject +++ b/_CoqProject @@ -14,5 +14,5 @@ theories/examples/sort_client.v theories/examples/sort_elem.v theories/examples/loop_sort.v theories/examples/sort_elem_client.v -theories/examples/mapper.v +theories/examples/map.v theories/examples/map_reduce.v diff --git a/theories/examples/mapper.v b/theories/examples/map.v similarity index 52% rename from theories/examples/mapper.v rename to theories/examples/map.v index 8ea8d3052c02e85685a21f9eb4f057b701bf2aa6..f134e622fe6cdbeb1b83dace6eb2e9d879e94b50 100644 --- a/theories/examples/mapper.v +++ b/theories/examples/map.v @@ -1,32 +1,37 @@ From osiris.channel Require Import proto_channel proofmode. -From iris.heap_lang Require Import proofmode notation. -From osiris.utils Require Import list spin_lock contribution. +From iris.heap_lang Require Import proofmode notation lib.spin_lock. +From osiris.utils Require Import list contribution. From iris.algebra Require Import gmultiset. -Definition mapper : val := - rec: "go" "f" "l" "c" := +Definition map_worker : val := + rec: "go" "map" "l" "c" := acquire "l";; send "c" #true;; if: ~recv "c" then release "l" else let: "x" := recv "c" in release "l";; - let: "y" := "f" "x" in + let: "y" := "map" "x" in acquire "l";; send "c" #false;; send "c" "y";; release "l";; - "go" "f" "l" "c". + "go" "map" "l" "c". -Definition start_mappers : val := - rec: "go" "n" "f" "l" "c" := +Definition start_map_workers : val := + rec: "go" "n" "map" "l" "c" := if: "n" = #0 then #() else - Fork (mapper "f" "l" "c");; - "go" ("n" - #1) "f" "l" "c". + Fork (map_worker "map" "l" "c");; + "go" ("n" - #1) "map" "l" "c". -Definition mapper_service_loop : val := +Definition start_map_service : val := λ: "n" "map", + start_chan (λ: "c", + let: "l" := newlock #() in + start_map_workers "n" "map" "l" "c"). + +Definition par_map_server : val := rec: "go" "n" "c" "xs" "ys" := if: "n" = #0 then "ys" else - if: recv "c" then (* send item to mapper *) + if: recv "c" then (* send item to map_worker *) if: lisnil "xs" then send "c" #false;; "go" ("n" - #1) "c" "xs" "ys" @@ -34,26 +39,32 @@ Definition mapper_service_loop : val := send "c" #true;; send "c" (lhead "xs");; "go" "n" "c" (ltail "xs") "ys" - else (* receive item from mapper *) + else (* receive item from map_worker *) let: "zs" := recv "c" in "go" "n" "c" "xs" (lapp "zs" "ys"). -Definition mapper_service : val := λ: "n" "f" "xs", - let: "l" := new_lock #() in - let: "c" := start_chan (λ: "c", start_mappers "n" "f" "l" "c") in - mapper_service_loop "n" "c" "xs" (lnil #()). +Definition par_map : val := λ: "n" "map" "xs", + let: "c" := start_map_service "n" "map" in + par_map_server "n" "c" "xs" (lnil #()). -Class mapperG Σ A `{Countable A} := { - mapper_contributionG :> contributionG Σ (gmultisetUR A) +Class mapG Σ A `{Countable A} := { + map_contributionG :> contributionG Σ (gmultisetUR A); + map_lockG :> lockG Σ; }. -Section mapper. +Section map. Context `{Countable A} {B : Type}. - Context `{!heapG Σ, !proto_chanG Σ, !lockG Σ, !mapperG Σ A} (N : namespace). - Context (IA : A → val → iProp Σ) (IB : B → val → iProp Σ) (f : A → list B). + Context `{!heapG Σ, !proto_chanG Σ, !mapG Σ A} (N : namespace). + Context (IA : A → val → iProp Σ) (IB : B → val → iProp Σ) (map : A → list B). Local Open Scope nat_scope. + Implicit Types n : nat. + + Definition map_spec (vmap : val) : iProp Σ := (∀ x v, + {{{ IA x v }}} + vmap v + {{{ ws, RET (llist ws); [∗ list] y;w ∈ map x;ws, IB y w }}})%I. - Definition mapper_protocol_aux (rec : nat -d> gmultiset A -d> iProto Σ) : + Definition map_worker_protocol_aux (rec : nat -d> gmultiset A -d> iProto Σ) : nat -d> gmultiset A -d> iProto Σ := λ i X, let rec : nat → gmultiset A → iProto Σ := rec in (if i is 0 then END else @@ -61,29 +72,27 @@ Section mapper. <+> rec (pred i) X ) <{⌜ i ≠1 ∨ X = ∅ âŒ}&> - <?> x ws, MSG llist ws {{ ⌜ x ∈ X ⌠∧ [∗ list] y;w ∈ f x;ws, IB y w }}; + <?> x ws, MSG llist ws {{ ⌜ x ∈ X ⌠∧ [∗ list] y;w ∈ map x;ws, IB y w }}; rec i (X ∖ {[ x ]}))%proto. - Instance mapper_protocol_aux_contractive : Contractive mapper_protocol_aux. + Instance map_worker_protocol_aux_contractive : Contractive map_worker_protocol_aux. Proof. solve_proper_prepare. f_equiv. solve_proto_contractive. Qed. - Definition mapper_protocol := fixpoint mapper_protocol_aux. - Global Instance mapper_protocol_unfold n X : - ProtoUnfold (mapper_protocol n X) (mapper_protocol_aux mapper_protocol n X). - Proof. apply proto_unfold_eq, (fixpoint_unfold mapper_protocol_aux). Qed. + Definition map_worker_protocol := fixpoint map_worker_protocol_aux. + Global Instance map_worker_protocol_unfold n X : + ProtoUnfold (map_worker_protocol n X) (map_worker_protocol_aux map_worker_protocol n X). + Proof. apply proto_unfold_eq, (fixpoint_unfold map_worker_protocol_aux). Qed. - Definition mapper_lock_inv (γ : gname) (c : val) : iProp Σ := - (∃ i X, server γ i X ∗ c ↣ iProto_dual (mapper_protocol i X) @ N)%I. + Definition map_worker_lock_inv (γ : gname) (c : val) : iProp Σ := + (∃ i X, server γ i X ∗ c ↣ iProto_dual (map_worker_protocol i X) @ N)%I. - Lemma mapper_spec γ (vf : val) lk c q : - (∀ x v, - {{{ IA x v }}} vf v {{{ ws, RET (llist ws); [∗ list] y;w ∈ f x;ws, IB y w }}}) -∗ - {{{ is_lock N lk (mapper_lock_inv γ c) ∗ - unlocked N lk q ∗ client γ (∅ : gmultiset A) }}} - mapper vf #lk c + Lemma map_worker_spec γl γ vmap lk c : + map_spec vmap -∗ + {{{ is_lock N γl lk (map_worker_lock_inv γ c) ∗ client γ (∅ : gmultiset A) }}} + map_worker vmap lk c {{{ RET #(); True }}}. Proof. - iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγ) HΦ". iLöb as "IH". + iIntros "#Hmap !>" (Φ) "[#Hlk Hγ] HΦ". iLöb as "IH". wp_rec; wp_pures. - wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]". + wp_apply (acquire_spec with "Hlk"); iIntros "[Hl H]". iDestruct "H" as (i X) "[Hs Hc]". iDestruct (@server_agree with "Hs Hγ") as %[??]; destruct i as [|i]=>//=. iAssert ⌜ S i ≠1 ∨ X = ∅ âŒ%I as %?. @@ -100,7 +109,7 @@ Section mapper. rewrite left_id_L. wp_apply (release_spec with "[$Hlk $Hl Hc Hs]"). { iExists (S i), _. iFrame. } - clear dependent i X. iIntros "Hu". wp_apply ("Hf" with "HI"); iIntros (w) "HI". + clear dependent i X. iIntros "Hu". wp_apply ("Hmap" with "HI"); iIntros (w) "HI". wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]". iDestruct "H" as (i X) "[Hs Hc]". iDestruct (@server_agree with "Hs Hγ") @@ -115,33 +124,47 @@ Section mapper. iIntros "Hu". by wp_apply ("IH" with "[$] [$]"). Qed. - Lemma start_mappers_spec γ (n : nat) (vf : val) lk c q : - (∀ x v, - {{{ IA x v }}} vf v {{{ ws, RET (llist ws); [∗ list] y;w ∈ f x;ws, IB y w }}}) -∗ - {{{ is_lock N lk (mapper_lock_inv γ c) ∗ unlocked N lk q ∗ - client γ (∅:gmultiset A) ^ n }}} - start_mappers #n vf #lk c + Lemma start_map_workers_spec γl γ n vmap lk c : + map_spec vmap -∗ + {{{ is_lock N γl lk (map_worker_lock_inv γ c) ∗ client γ (∅:gmultiset A) ^ n }}} + start_map_workers #n vmap lk c {{{ RET #(); True }}}. Proof. - iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγs) HΦ". - iInduction n as [|n] "IH" forall (q); wp_rec; wp_pures; simpl. + iIntros "#Hmap !>" (Φ) "[#Hlk Hγs] HΦ". + iInduction n as [|n] "IH"; wp_rec; wp_pures; simpl. { by iApply "HΦ". } - iDestruct "Hγs" as "[Hγ Hγs]"; iDestruct "Hu" as "[Hu Hu']". - wp_apply (wp_fork with "[Hγ Hu]"). - { iNext. wp_apply (mapper_spec with "Hf [$]"); auto. } + iDestruct "Hγs" as "[Hγ Hγs]". + wp_apply (wp_fork with "[Hγ]"). + { iNext. wp_apply (map_worker_spec with "Hmap [$]"); auto. } wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ. - wp_apply ("IH" with "[$] [$] [$]"). + wp_apply ("IH" with "[$] [$]"). + Qed. + + Lemma start_map_service_spec n vmap : + map_spec vmap -∗ + {{{ True }}} + start_map_service #n vmap + {{{ c, RET c; c ↣ map_worker_protocol n ∅ @ N }}}. + Proof. + iIntros "#Hf !>"; iIntros (Φ _) "HΦ". wp_lam; wp_pures. + wp_apply (start_chan_proto_spec N (map_worker_protocol n ∅)); iIntros (c) "// Hc". + wp_lam. + iMod (contribution_init_pow (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]". + wp_apply (newlock_spec N (map_worker_lock_inv γ c) with "[Hc Hs]"). + { iExists n, ∅. iFrame. } + iIntros (lk γl) "#Hlk". + wp_apply (start_map_workers_spec with "Hf [$Hlk $Hγs]"); auto. Qed. - Lemma mapper_service_loop_spec (n : nat) c vs xs ws X ys : + Lemma par_map_server_spec n c vs xs ws X ys : (n = 0 → X = ∅ ∧ xs = []) → {{{ - c ↣ mapper_protocol n X @ N ∗ + c ↣ map_worker_protocol n X @ N ∗ ([∗ list] x;v ∈ xs;vs, IA x v) ∗ ([∗ list] y;w ∈ ys;ws, IB y w) }}} - mapper_service_loop #n c (llist vs) (llist ws) + par_map_server #n c (llist vs) (llist ws) {{{ ys' ws', RET (llist ws'); - ⌜ys' ≡ₚ (xs ++ elements X) ≫= f⌠∗ [∗ list] y;w ∈ ys' ++ ys;ws', IB y w + ⌜ys' ≡ₚ (xs ++ elements X) ≫= map⌠∗ [∗ list] y;w ∈ ys' ++ ys;ws', IB y w }}}. Proof. iIntros (Hn Φ) "(Hc & HIA & HIB) HΦ". @@ -162,12 +185,12 @@ Section mapper. iIntros (ys' ws'). rewrite gmultiset_elements_disj_union gmultiset_elements_singleton. rewrite assoc_L -(comm _ [x]). iApply "HΦ". - - wp_recv (x w) as (Hx) "HIBfx". + - wp_recv (x w) as (Hx) "HIBx". wp_apply (lapp_spec with "[//]"); iIntros (_). - wp_apply ("IH" $! _ _ _ _ _ (_ ++ _) with "[] Hc HIA [HIBfx HIB]"); first done. + wp_apply ("IH" $! _ _ _ _ _ (_ ++ _) with "[] Hc HIA [HIBx HIB]"); first done. { simpl; iFrame. } iIntros (ys' ws'); iDestruct 1 as (Hys) "HIB"; simplify_eq/=. - iApply ("HΦ" $! (ys' ++ f x)). iSplit. + iApply ("HΦ" $! (ys' ++ map x)). iSplit. + iPureIntro. rewrite (gmultiset_disj_union_difference {[ x ]} X) -?gmultiset_elem_of_singleton_subseteq //. rewrite (comm disj_union) gmultiset_elements_disj_union. @@ -175,25 +198,17 @@ Section mapper. + by rewrite -assoc_L. Qed. - Lemma mapper_service_spec (n : nat) (vf : val) vs xs : + Lemma par_map_spec n vmap vs xs : 0 < n → - (∀ x v, - {{{ IA x v }}} vf v {{{ ws, RET (llist ws); [∗ list] y;w ∈ f x;ws, IB y w }}}) -∗ + map_spec vmap -∗ {{{ [∗ list] x;v ∈ xs;vs, IA x v }}} - mapper_service #n vf (llist vs) - {{{ ys ws, RET (llist ws); ⌜ys ≡ₚ xs ≫= f⌠∗ [∗ list] y;w ∈ ys;ws, IB y w }}}. + par_map #n vmap (llist vs) + {{{ ys ws, RET (llist ws); ⌜ys ≡ₚ xs ≫= map⌠∗ [∗ list] y;w ∈ ys;ws, IB y w }}}. Proof. - iIntros (?) "#Hf !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures. - wp_apply (new_lock_spec N with "[//]"); iIntros (lk) "[Hu Hlk]". - wp_apply (start_chan_proto_spec N (mapper_protocol n ∅) with "[Hu Hlk]"); - try iNext; iIntros (c) "Hc". - { wp_lam. - iMod (contribution_init_pow (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]". - iMod ("Hlk" $! (mapper_lock_inv γ c) with "[Hc Hs]") as "#Hlk". - { iExists n, ∅. iFrame. } - wp_apply (start_mappers_spec with "Hf [$Hlk $Hu $Hγs]"); auto. } + iIntros (?) "#Hmap !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures. + wp_apply (start_map_service_spec with "Hmap [//]"); iIntros (c) "Hc". wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_). - wp_apply (mapper_service_loop_spec _ _ _ _ [] ∅ [] with "[$Hc $HI //]"); first lia. + wp_apply (par_map_server_spec _ _ _ _ [] ∅ [] with "[$Hc $HI //]"); first lia. iIntros (ys ws). rewrite /= gmultiset_elements_empty !right_id_L . iApply "HΦ". Qed. -End mapper. +End map. diff --git a/theories/examples/map_reduce.v b/theories/examples/map_reduce.v index 83dd07a6c6d285752f6550c3db97ac8060af119a..cac7f2b1d19bd402eecb59dd15b4fd6e7378d9ec 100644 --- a/theories/examples/map_reduce.v +++ b/theories/examples/map_reduce.v @@ -1,8 +1,8 @@ From stdpp Require Import sorting. From osiris.channel Require Import proto_channel proofmode. From iris.heap_lang Require Import proofmode notation. -From osiris.utils Require Import list compare spin_lock contribution. -From osiris.examples Require Import mapper sort_elem sort_elem_client. +From osiris.utils Require Import list compare contribution. +From osiris.examples Require Import map sort_elem sort_elem_client. From iris.algebra Require Import gmultiset. From Coq Require Import SetoidPermutation. @@ -142,7 +142,7 @@ Section map_reduce. End map_reduce. (** Distributed version *) -Definition map_reduce_service_map_loop : val := +Definition par_map_reduce_map_server : val := rec: "go" "n" "cmap" "csort" "xs" := if: "n" = #0 then #() else if: recv "cmap" then (* send item to mapper *) @@ -158,7 +158,7 @@ Definition map_reduce_service_map_loop : val := send_all "csort" "zs";; "go" "n" "cmap" "csort" "xs". -Definition map_reduce_service_recv_same_key : val := +Definition par_map_reduce_collect : val := rec: "go" "csort" "i" "ys" := if: ~recv "csort" then (("i", "ys"), NONE) else let: "jy" := recv "csort" in @@ -166,7 +166,7 @@ Definition map_reduce_service_recv_same_key : val := if: "i" = "j" then "go" "csort" "j" (lcons "y" "ys") else (("i", "ys"), SOME ("j", "y")). -Definition map_reduce_service_reduce_loop : val := +Definition par_map_reduce_reduce_server : val := rec: "go" "n" "csort" "cred" "acc" "zs" := if: "n" = #0 then "zs" else if: recv "cred" then (* Send item to mapper *) @@ -176,7 +176,7 @@ Definition map_reduce_service_reduce_loop : val := send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs" | SOME "acc" => (* Read subsequent items with the same key *) - let: "grp" := map_reduce_service_recv_same_key "csort" + let: "grp" := par_map_reduce_collect "csort" (Fst "acc") (lcons (Snd "acc") (lnil #())) in send "cred" #true;; send "cred" (Fst "grp");; @@ -188,32 +188,31 @@ Definition map_reduce_service_reduce_loop : val := Definition cmpZfst : val := λ: "x" "y", Fst "x" ≤ Fst "y". -Definition map_reduce_service : val := λ: "n" "map" "red" "xs", - let: "lmap" := new_lock #() in - let: "cmap" := start_chan (λ: "c", start_mappers "n" "map" "lmap" "c") in +Definition par_map_reduce : val := λ: "n" "map" "red" "xs", + let: "cmap" := start_map_service "n" "map" in let: "csort" := start_chan (λ: "c", sort_elem_service cmpZfst "c") in - map_reduce_service_map_loop "n" "cmap" "csort" "xs";; + par_map_reduce_map_server "n" "cmap" "csort" "xs";; send "csort" #stop;; - let: "lred" := new_lock #() in - let: "cred" := start_chan (λ: "c", start_mappers "n" "red" "lred" "c") in - (* Fetch the first sorted element, which we need in the loop *) + let: "cred" := start_map_service "n" "red" in + (* We need the first sorted element in the loop to compare subsequent elements *) if: ~recv "csort" then lnil #() else (* Handle the empty case *) let: "jy" := recv "csort" in - map_reduce_service_reduce_loop "n" "csort" "cred" (SOME "jy") (lnil #()). + par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") (lnil #()). (** Correctness proofs of the distributed version *) Class map_reduceG Σ A B `{Countable A, Countable B} := { - map_reduce_mapG :> mapperG Σ A; - map_reduce_reduceG :> mapperG Σ (Z * list B); + map_reduce_mapG :> mapG Σ A; + map_reduce_reduceG :> mapG Σ (Z * list B); }. Section mapper. Context `{Countable A, Countable B} {C : Type}. - Context `{!heapG Σ, !proto_chanG Σ, !lockG Σ, !map_reduceG Σ A B} (N : namespace). + Context `{!heapG Σ, !proto_chanG Σ, !map_reduceG Σ A B} (N : namespace). Context (IA : A → val → iProp Σ) (IB : Z → B → val → iProp Σ) (IC : C → val → iProp Σ). Context (map : A → list (Z * B)) (red : Z → list B → list C). Context `{!∀ j, Proper ((≡ₚ) ==> (≡ₚ)) (red j)}. Local Open Scope nat_scope. + Implicit Types n : nat. Definition IZB (iy : Z * B) (w : val) : iProp Σ := (∃ w', ⌜ w = (#iy.1, w')%V ⌠∧ IB iy.1 iy.2 w')%I. @@ -236,14 +235,14 @@ Section mapper. repeat case_bool_decide=> //; unfold RZB, prod_relation in *; naive_solver. Qed. - Lemma map_reduce_service_map_loop_spec (n : nat) cmap csort vs xs X ys : + Lemma par_map_reduce_map_server_spec n cmap csort vs xs X ys : (n = 0 → X = ∅ ∧ xs = []) → {{{ - cmap ↣ mapper_protocol IA IZB map n (X : gmultiset A) @ N ∗ + cmap ↣ map_worker_protocol IA IZB map n (X : gmultiset A) @ N ∗ csort ↣ sort_elem_head_protocol IZB RZB ys @ N ∗ ([∗ list] x;v ∈ xs;vs, IA x v) }}} - map_reduce_service_map_loop #n cmap csort (llist vs) + par_map_reduce_map_server #n cmap csort (llist vs) {{{ ys', RET #(); ⌜ys' ≡ₚ (xs ++ elements X) ≫= map⌠∗ csort ↣ sort_elem_head_protocol IZB RZB (ys ++ ys') @ N @@ -280,7 +279,7 @@ Section mapper. by rewrite gmultiset_elements_singleton assoc_L bind_app -Hys /= right_id_L comm. Qed. - Lemma map_reduce_service_recv_same_key_spec csort iys iys_sorted i ys ws : + Lemma par_map_reduce_collect_spec csort iys iys_sorted i ys ws : let acc := from_option (λ '(i,y,w), [(i,y)]) [] in let accv := from_option (λ '(i,y,w), SOMEV (#(i:Z),w)) NONEV in ys ≠[] → @@ -290,7 +289,7 @@ Section mapper. csort ↣ sort_elem_tail_protocol IZB RZB iys (iys_sorted ++ ((i,) <$> ys)) @ N ∗ [∗ list] y;w ∈ ys;ws, IB i y w }}} - map_reduce_service_recv_same_key csort #i (llist (reverse ws)) + par_map_reduce_collect csort #i (llist (reverse ws)) {{{ ys' ws' miy, RET ((#i,llist (reverse ws')),accv miy); ⌜ Sorted RZB ((iys_sorted ++ ((i,) <$> ys ++ ys')) ++ acc miy) ⌠∗ ⌜ from_option (λ '(j,_,_), i ≠j ∧ j ∉ iys_sorted.*1) @@ -331,7 +330,7 @@ Section mapper. eapply elem_of_StronglySorted_app; set_solver. Qed. - Lemma mapper_service_loop_spec (n : nat) iys iys_sorted miy zs ws Y csort cred : + Lemma par_map_reduce_reduce_server_spec n iys iys_sorted miy zs ws Y csort cred : let acc := from_option (λ '(i,y,w), [(i,y)]) [] in let accv := from_option (λ '(i,y,w), SOMEV (#(i:Z),w)) NONEV in (n = 0 → miy = None ∧ Y = ∅) → @@ -340,11 +339,11 @@ Section mapper. {{{ csort ↣ from_option (λ _, sort_elem_tail_protocol IZB RZB iys (iys_sorted ++ acc miy)) END%proto miy @ N ∗ - cred ↣ mapper_protocol IZBs IC (curry red) n (Y : gmultiset (Z * list B)) @ N ∗ + cred ↣ map_worker_protocol IZBs IC (curry red) n (Y : gmultiset (Z * list B)) @ N ∗ from_option (λ '(i,y,w), IB i y w) True miy ∗ ([∗ list] z;w ∈ zs;ws, IC z w) }}} - map_reduce_service_reduce_loop #n csort cred (accv miy) (llist ws) + par_map_reduce_reduce_server #n csort cred (accv miy) (llist ws) {{{ zs' ws', RET (llist ws'); ⌜ (group iys_sorted ≫= curry red) ++ zs' ≡ₚ (group iys ++ elements Y) ≫= curry red ⌠∗ [∗ list] z;w ∈ zs' ++ zs;ws', IC z w @@ -363,7 +362,7 @@ Section mapper. with "[%] [%] [%] Hcsort Hcred [] HIC HΦ"); naive_solver. + wp_apply (lnil_spec with "[//]"); iIntros (_). wp_apply (lcons_spec with "[//]"); iIntros (_). - wp_apply (map_reduce_service_recv_same_key_spec _ _ _ _ [_] [_] + wp_apply (par_map_reduce_collect_spec _ _ _ _ [_] [_] with "[$Hcsort HIB]"); try done. { simpl; iFrame. } iIntros (ys' ws'' miy). @@ -393,60 +392,34 @@ Section mapper. by rewrite right_id_L !assoc_L. Qed. - Definition map_spec (vmap : val) : iProp Σ := (∀ x v, - {{{ IA x v }}} - vmap v - {{{ ws, RET (llist ws); [∗ list] iy;w ∈ map x;ws, IZB iy w }}})%I. - Definition red_spec (vred : val) : iProp Σ := (∀ i ys vs, - {{{ [∗ list] y;v ∈ ys;vs, IB i y v }}} - vred (#i, llist vs)%V - {{{ ws, RET (llist ws); [∗ list] z;w ∈ red i ys;ws, IC z w }}})%I. - - Lemma map_reduce_service_spec (n : nat) vmap vred vs xs : + Lemma par_map_reduce_spec n vmap vred vs xs : 0 < n → - map_spec vmap -∗ - red_spec vred -∗ + map_spec IA IZB map vmap -∗ + map_spec IZBs IC (curry red) vred -∗ {{{ [∗ list] x;v ∈ xs;vs, IA x v }}} - map_reduce_service #n vmap vred (llist vs) + par_map_reduce #n vmap vred (llist vs) {{{ zs ws, RET (llist ws); ⌜zs ≡ₚ map_reduce map red xs⌠∗ [∗ list] z;w ∈ zs;ws, IC z w }}}. Proof. iIntros (?) "#Hmap #Hred !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures. - wp_apply (new_lock_spec N with "[//]"); iIntros (lkmap) "[Hu Hlk]". - wp_apply (start_chan_proto_spec N (mapper_protocol IA IZB map n ∅) - with "[Hu Hlk]"); try iNext; iIntros (cmap) "Hcmap". - { wp_lam. - iMod (contribution_init_pow (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]". - iMod ("Hlk" $! (mapper_lock_inv N IA IZB map γ cmap) with "[Hcmap Hs]") as "#Hlk". - { iExists n, ∅. iFrame. } - wp_apply (start_mappers_spec with "Hmap [$Hlk $Hu $Hγs]"); auto. } + wp_apply (start_map_service_spec with "Hmap [//]"); iIntros (cmap) "Hcmap". wp_apply (start_chan_proto_spec N (sort_elem_protocol IZB RZB <++> END)%proto); iIntros (csort) "Hcsort". { wp_apply (sort_elem_service_spec N with "[] Hcsort"); last by auto. iApply RZB_cmp_spec. } rewrite right_id. - wp_apply (map_reduce_service_map_loop_spec with "[$Hcmap $Hcsort $HI]"); first lia. + wp_apply (par_map_reduce_map_server_spec with "[$Hcmap $Hcsort $HI]"); first lia. iIntros (iys). rewrite gmultiset_elements_empty right_id_L. - iDestruct 1 as (Hiys) "Hcsort /=". wp_select; simpl. - wp_apply (new_lock_spec N with "[//]"); iIntros (lkred) "[Hu Hlk]". - wp_apply (start_chan_proto_spec N (mapper_protocol IZBs IC (curry red) n ∅) - with "[Hu Hlk]"); try iNext; iIntros (cred) "Hcred". - { wp_lam. - iMod (contribution_init_pow (A:=gmultisetUR (Z * list B)) n) as (γ) "[Hs Hγs]". - iMod ("Hlk" $! (mapper_lock_inv N IZBs IC (curry red) γ cred) - with "[Hcred Hs]") as "#Hlk". - { iExists n, ∅. iFrame. } - wp_apply (start_mappers_spec with "[Hred] [$Hlk $Hu $Hγs]"); last by auto. - iIntros ([i ys] v) "!>"; iIntros (Ψ); iDestruct 1 as (ws ->) "HIB". - iIntros "HΦ /=". iApply ("Hred" with "HIB HΦ"). } + iDestruct 1 as (Hiys) "Hcsort /=". wp_select; wp_pures; simpl. + wp_apply (start_map_service_spec with "Hred [//]"); iIntros (cred) "Hcred". wp_branch as %_|%Hnil; last first. { wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_). iApply ("HΦ" $! []); simpl. iSplit; [iPureIntro|done]. by rewrite /map_reduce /= -Hiys -Hnil. } wp_recv ([i y] ?) as (_ w ->) "HIB /="; wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_). - wp_apply (mapper_service_loop_spec _ _ [] (Some (i, y, w)) [] [] + wp_apply (par_map_reduce_reduce_server_spec _ _ [] (Some (i, y, w)) [] [] with "[$Hcsort $Hcred $HIB]"); simpl; auto; [lia|set_solver|]. iIntros (zs ws). rewrite /= gmultiset_elements_empty !right_id. iDestruct 1 as (Hzs) "HIC". iApply ("HΦ" with "[$HIC]"). by rewrite Hzs Hiys.