Skip to content
Snippets Groups Projects
Commit 5343f4be authored by Robbert Krebbers's avatar Robbert Krebbers
Browse files

Misc cleanup.

parent babec8f2
No related branches found
No related tags found
No related merge requests found
......@@ -14,5 +14,5 @@ theories/examples/sort_client.v
theories/examples/sort_elem.v
theories/examples/loop_sort.v
theories/examples/sort_elem_client.v
theories/examples/mapper.v
theories/examples/map.v
theories/examples/map_reduce.v
From osiris.channel Require Import proto_channel proofmode.
From iris.heap_lang Require Import proofmode notation.
From osiris.utils Require Import list spin_lock contribution.
From iris.heap_lang Require Import proofmode notation lib.spin_lock.
From osiris.utils Require Import list contribution.
From iris.algebra Require Import gmultiset.
Definition mapper : val :=
rec: "go" "f" "l" "c" :=
Definition map_worker : val :=
rec: "go" "map" "l" "c" :=
acquire "l";;
send "c" #true;;
if: ~recv "c" then release "l" else
let: "x" := recv "c" in
release "l";;
let: "y" := "f" "x" in
let: "y" := "map" "x" in
acquire "l";;
send "c" #false;;
send "c" "y";;
release "l";;
"go" "f" "l" "c".
"go" "map" "l" "c".
Definition start_mappers : val :=
rec: "go" "n" "f" "l" "c" :=
Definition start_map_workers : val :=
rec: "go" "n" "map" "l" "c" :=
if: "n" = #0 then #() else
Fork (mapper "f" "l" "c");;
"go" ("n" - #1) "f" "l" "c".
Fork (map_worker "map" "l" "c");;
"go" ("n" - #1) "map" "l" "c".
Definition mapper_service_loop : val :=
Definition start_map_service : val := λ: "n" "map",
start_chan (λ: "c",
let: "l" := newlock #() in
start_map_workers "n" "map" "l" "c").
Definition par_map_server : val :=
rec: "go" "n" "c" "xs" "ys" :=
if: "n" = #0 then "ys" else
if: recv "c" then (* send item to mapper *)
if: recv "c" then (* send item to map_worker *)
if: lisnil "xs" then
send "c" #false;;
"go" ("n" - #1) "c" "xs" "ys"
......@@ -34,26 +39,32 @@ Definition mapper_service_loop : val :=
send "c" #true;;
send "c" (lhead "xs");;
"go" "n" "c" (ltail "xs") "ys"
else (* receive item from mapper *)
else (* receive item from map_worker *)
let: "zs" := recv "c" in
"go" "n" "c" "xs" (lapp "zs" "ys").
Definition mapper_service : val := λ: "n" "f" "xs",
let: "l" := new_lock #() in
let: "c" := start_chan (λ: "c", start_mappers "n" "f" "l" "c") in
mapper_service_loop "n" "c" "xs" (lnil #()).
Definition par_map : val := λ: "n" "map" "xs",
let: "c" := start_map_service "n" "map" in
par_map_server "n" "c" "xs" (lnil #()).
Class mapperG Σ A `{Countable A} := {
mapper_contributionG :> contributionG Σ (gmultisetUR A)
Class mapG Σ A `{Countable A} := {
map_contributionG :> contributionG Σ (gmultisetUR A);
map_lockG :> lockG Σ;
}.
Section mapper.
Section map.
Context `{Countable A} {B : Type}.
Context `{!heapG Σ, !proto_chanG Σ, !lockG Σ, !mapperG Σ A} (N : namespace).
Context (IA : A val iProp Σ) (IB : B val iProp Σ) (f : A list B).
Context `{!heapG Σ, !proto_chanG Σ, !mapG Σ A} (N : namespace).
Context (IA : A val iProp Σ) (IB : B val iProp Σ) (map : A list B).
Local Open Scope nat_scope.
Implicit Types n : nat.
Definition map_spec (vmap : val) : iProp Σ := ( x v,
{{{ IA x v }}}
vmap v
{{{ ws, RET (llist ws); [ list] y;w map x;ws, IB y w }}})%I.
Definition mapper_protocol_aux (rec : nat -d> gmultiset A -d> iProto Σ) :
Definition map_worker_protocol_aux (rec : nat -d> gmultiset A -d> iProto Σ) :
nat -d> gmultiset A -d> iProto Σ := λ i X,
let rec : nat gmultiset A iProto Σ := rec in
(if i is 0 then END else
......@@ -61,29 +72,27 @@ Section mapper.
<+>
rec (pred i) X
) <{ i 1 X = }&>
<?> x ws, MSG llist ws {{ x X [ list] y;w f x;ws, IB y w }};
<?> x ws, MSG llist ws {{ x X [ list] y;w map x;ws, IB y w }};
rec i (X {[ x ]}))%proto.
Instance mapper_protocol_aux_contractive : Contractive mapper_protocol_aux.
Instance map_worker_protocol_aux_contractive : Contractive map_worker_protocol_aux.
Proof. solve_proper_prepare. f_equiv. solve_proto_contractive. Qed.
Definition mapper_protocol := fixpoint mapper_protocol_aux.
Global Instance mapper_protocol_unfold n X :
ProtoUnfold (mapper_protocol n X) (mapper_protocol_aux mapper_protocol n X).
Proof. apply proto_unfold_eq, (fixpoint_unfold mapper_protocol_aux). Qed.
Definition map_worker_protocol := fixpoint map_worker_protocol_aux.
Global Instance map_worker_protocol_unfold n X :
ProtoUnfold (map_worker_protocol n X) (map_worker_protocol_aux map_worker_protocol n X).
Proof. apply proto_unfold_eq, (fixpoint_unfold map_worker_protocol_aux). Qed.
Definition mapper_lock_inv (γ : gname) (c : val) : iProp Σ :=
( i X, server γ i X c iProto_dual (mapper_protocol i X) @ N)%I.
Definition map_worker_lock_inv (γ : gname) (c : val) : iProp Σ :=
( i X, server γ i X c iProto_dual (map_worker_protocol i X) @ N)%I.
Lemma mapper_spec γ (vf : val) lk c q :
( x v,
{{{ IA x v }}} vf v {{{ ws, RET (llist ws); [ list] y;w f x;ws, IB y w }}}) -∗
{{{ is_lock N lk (mapper_lock_inv γ c)
unlocked N lk q client γ ( : gmultiset A) }}}
mapper vf #lk c
Lemma map_worker_spec γl γ vmap lk c :
map_spec vmap -∗
{{{ is_lock N γl lk (map_worker_lock_inv γ c) client γ ( : gmultiset A) }}}
map_worker vmap lk c
{{{ RET #(); True }}}.
Proof.
iIntros "#Hf !>" (Φ) "(#Hlk & Hu & ) HΦ". iLöb as "IH".
iIntros "#Hmap !>" (Φ) "[#Hlk Hγ] HΦ". iLöb as "IH".
wp_rec; wp_pures.
wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]".
wp_apply (acquire_spec with "Hlk"); iIntros "[Hl H]".
iDestruct "H" as (i X) "[Hs Hc]".
iDestruct (@server_agree with "Hs Hγ") as %[??]; destruct i as [|i]=>//=.
iAssert S i 1 X = ⌝%I as %?.
......@@ -100,7 +109,7 @@ Section mapper.
rewrite left_id_L.
wp_apply (release_spec with "[$Hlk $Hl Hc Hs]").
{ iExists (S i), _. iFrame. }
clear dependent i X. iIntros "Hu". wp_apply ("Hf" with "HI"); iIntros (w) "HI".
clear dependent i X. iIntros "Hu". wp_apply ("Hmap" with "HI"); iIntros (w) "HI".
wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]".
iDestruct "H" as (i X) "[Hs Hc]".
iDestruct (@server_agree with "Hs Hγ")
......@@ -115,33 +124,47 @@ Section mapper.
iIntros "Hu". by wp_apply ("IH" with "[$] [$]").
Qed.
Lemma start_mappers_spec γ (n : nat) (vf : val) lk c q :
( x v,
{{{ IA x v }}} vf v {{{ ws, RET (llist ws); [ list] y;w f x;ws, IB y w }}}) -∗
{{{ is_lock N lk (mapper_lock_inv γ c) unlocked N lk q
client γ (∅:gmultiset A) ^ n }}}
start_mappers #n vf #lk c
Lemma start_map_workers_spec γl γ n vmap lk c :
map_spec vmap -∗
{{{ is_lock N γl lk (map_worker_lock_inv γ c) client γ (∅:gmultiset A) ^ n }}}
start_map_workers #n vmap lk c
{{{ RET #(); True }}}.
Proof.
iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγs) HΦ".
iInduction n as [|n] "IH" forall (q); wp_rec; wp_pures; simpl.
iIntros "#Hmap !>" (Φ) "[#Hlk Hγs] HΦ".
iInduction n as [|n] "IH"; wp_rec; wp_pures; simpl.
{ by iApply "HΦ". }
iDestruct "Hγs" as "[Hγ Hγs]"; iDestruct "Hu" as "[Hu Hu']".
wp_apply (wp_fork with "[Hγ Hu]").
{ iNext. wp_apply (mapper_spec with "Hf [$]"); auto. }
iDestruct "Hγs" as "[Hγ Hγs]".
wp_apply (wp_fork with "[Hγ]").
{ iNext. wp_apply (map_worker_spec with "Hmap [$]"); auto. }
wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ.
wp_apply ("IH" with "[$] [$] [$]").
wp_apply ("IH" with "[$] [$]").
Qed.
Lemma start_map_service_spec n vmap :
map_spec vmap -∗
{{{ True }}}
start_map_service #n vmap
{{{ c, RET c; c map_worker_protocol n @ N }}}.
Proof.
iIntros "#Hf !>"; iIntros (Φ _) "HΦ". wp_lam; wp_pures.
wp_apply (start_chan_proto_spec N (map_worker_protocol n )); iIntros (c) "// Hc".
wp_lam.
iMod (contribution_init_pow (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]".
wp_apply (newlock_spec N (map_worker_lock_inv γ c) with "[Hc Hs]").
{ iExists n, ∅. iFrame. }
iIntros (lk γl) "#Hlk".
wp_apply (start_map_workers_spec with "Hf [$Hlk $Hγs]"); auto.
Qed.
Lemma mapper_service_loop_spec (n : nat) c vs xs ws X ys :
Lemma par_map_server_spec n c vs xs ws X ys :
(n = 0 X = xs = [])
{{{
c mapper_protocol n X @ N
c map_worker_protocol n X @ N
([ list] x;v xs;vs, IA x v) ([ list] y;w ys;ws, IB y w)
}}}
mapper_service_loop #n c (llist vs) (llist ws)
par_map_server #n c (llist vs) (llist ws)
{{{ ys' ws', RET (llist ws');
ys' (xs ++ elements X) ≫= f [ list] y;w ys' ++ ys;ws', IB y w
ys' (xs ++ elements X) ≫= map [ list] y;w ys' ++ ys;ws', IB y w
}}}.
Proof.
iIntros (Hn Φ) "(Hc & HIA & HIB) HΦ".
......@@ -162,12 +185,12 @@ Section mapper.
iIntros (ys' ws').
rewrite gmultiset_elements_disj_union gmultiset_elements_singleton.
rewrite assoc_L -(comm _ [x]). iApply "HΦ".
- wp_recv (x w) as (Hx) "HIBfx".
- wp_recv (x w) as (Hx) "HIBx".
wp_apply (lapp_spec with "[//]"); iIntros (_).
wp_apply ("IH" $! _ _ _ _ _ (_ ++ _) with "[] Hc HIA [HIBfx HIB]"); first done.
wp_apply ("IH" $! _ _ _ _ _ (_ ++ _) with "[] Hc HIA [HIBx HIB]"); first done.
{ simpl; iFrame. }
iIntros (ys' ws'); iDestruct 1 as (Hys) "HIB"; simplify_eq/=.
iApply ("HΦ" $! (ys' ++ f x)). iSplit.
iApply ("HΦ" $! (ys' ++ map x)). iSplit.
+ iPureIntro. rewrite (gmultiset_disj_union_difference {[ x ]} X)
-?gmultiset_elem_of_singleton_subseteq //.
rewrite (comm disj_union) gmultiset_elements_disj_union.
......@@ -175,25 +198,17 @@ Section mapper.
+ by rewrite -assoc_L.
Qed.
Lemma mapper_service_spec (n : nat) (vf : val) vs xs :
Lemma par_map_spec n vmap vs xs :
0 < n
( x v,
{{{ IA x v }}} vf v {{{ ws, RET (llist ws); [ list] y;w f x;ws, IB y w }}}) -∗
map_spec vmap -∗
{{{ [ list] x;v xs;vs, IA x v }}}
mapper_service #n vf (llist vs)
{{{ ys ws, RET (llist ws); ys xs ≫= f [ list] y;w ys;ws, IB y w }}}.
par_map #n vmap (llist vs)
{{{ ys ws, RET (llist ws); ys xs ≫= map [ list] y;w ys;ws, IB y w }}}.
Proof.
iIntros (?) "#Hf !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures.
wp_apply (new_lock_spec N with "[//]"); iIntros (lk) "[Hu Hlk]".
wp_apply (start_chan_proto_spec N (mapper_protocol n ) with "[Hu Hlk]");
try iNext; iIntros (c) "Hc".
{ wp_lam.
iMod (contribution_init_pow (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]".
iMod ("Hlk" $! (mapper_lock_inv γ c) with "[Hc Hs]") as "#Hlk".
{ iExists n, ∅. iFrame. }
wp_apply (start_mappers_spec with "Hf [$Hlk $Hu $Hγs]"); auto. }
iIntros (?) "#Hmap !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures.
wp_apply (start_map_service_spec with "Hmap [//]"); iIntros (c) "Hc".
wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_).
wp_apply (mapper_service_loop_spec _ _ _ _ [] [] with "[$Hc $HI //]"); first lia.
wp_apply (par_map_server_spec _ _ _ _ [] [] with "[$Hc $HI //]"); first lia.
iIntros (ys ws). rewrite /= gmultiset_elements_empty !right_id_L . iApply "HΦ".
Qed.
End mapper.
End map.
From stdpp Require Import sorting.
From osiris.channel Require Import proto_channel proofmode.
From iris.heap_lang Require Import proofmode notation.
From osiris.utils Require Import list compare spin_lock contribution.
From osiris.examples Require Import mapper sort_elem sort_elem_client.
From osiris.utils Require Import list compare contribution.
From osiris.examples Require Import map sort_elem sort_elem_client.
From iris.algebra Require Import gmultiset.
From Coq Require Import SetoidPermutation.
......@@ -142,7 +142,7 @@ Section map_reduce.
End map_reduce.
(** Distributed version *)
Definition map_reduce_service_map_loop : val :=
Definition par_map_reduce_map_server : val :=
rec: "go" "n" "cmap" "csort" "xs" :=
if: "n" = #0 then #() else
if: recv "cmap" then (* send item to mapper *)
......@@ -158,7 +158,7 @@ Definition map_reduce_service_map_loop : val :=
send_all "csort" "zs";;
"go" "n" "cmap" "csort" "xs".
Definition map_reduce_service_recv_same_key : val :=
Definition par_map_reduce_collect : val :=
rec: "go" "csort" "i" "ys" :=
if: ~recv "csort" then (("i", "ys"), NONE) else
let: "jy" := recv "csort" in
......@@ -166,7 +166,7 @@ Definition map_reduce_service_recv_same_key : val :=
if: "i" = "j" then "go" "csort" "j" (lcons "y" "ys") else
(("i", "ys"), SOME ("j", "y")).
Definition map_reduce_service_reduce_loop : val :=
Definition par_map_reduce_reduce_server : val :=
rec: "go" "n" "csort" "cred" "acc" "zs" :=
if: "n" = #0 then "zs" else
if: recv "cred" then (* Send item to mapper *)
......@@ -176,7 +176,7 @@ Definition map_reduce_service_reduce_loop : val :=
send "cred" #false;; "go" ("n" - #1) "csort" "cred" NONE "zs"
| SOME "acc" =>
(* Read subsequent items with the same key *)
let: "grp" := map_reduce_service_recv_same_key "csort"
let: "grp" := par_map_reduce_collect "csort"
(Fst "acc") (lcons (Snd "acc") (lnil #())) in
send "cred" #true;;
send "cred" (Fst "grp");;
......@@ -188,32 +188,31 @@ Definition map_reduce_service_reduce_loop : val :=
Definition cmpZfst : val := λ: "x" "y", Fst "x" Fst "y".
Definition map_reduce_service : val := λ: "n" "map" "red" "xs",
let: "lmap" := new_lock #() in
let: "cmap" := start_chan (λ: "c", start_mappers "n" "map" "lmap" "c") in
Definition par_map_reduce : val := λ: "n" "map" "red" "xs",
let: "cmap" := start_map_service "n" "map" in
let: "csort" := start_chan (λ: "c", sort_elem_service cmpZfst "c") in
map_reduce_service_map_loop "n" "cmap" "csort" "xs";;
par_map_reduce_map_server "n" "cmap" "csort" "xs";;
send "csort" #stop;;
let: "lred" := new_lock #() in
let: "cred" := start_chan (λ: "c", start_mappers "n" "red" "lred" "c") in
(* Fetch the first sorted element, which we need in the loop *)
let: "cred" := start_map_service "n" "red" in
(* We need the first sorted element in the loop to compare subsequent elements *)
if: ~recv "csort" then lnil #() else (* Handle the empty case *)
let: "jy" := recv "csort" in
map_reduce_service_reduce_loop "n" "csort" "cred" (SOME "jy") (lnil #()).
par_map_reduce_reduce_server "n" "csort" "cred" (SOME "jy") (lnil #()).
(** Correctness proofs of the distributed version *)
Class map_reduceG Σ A B `{Countable A, Countable B} := {
map_reduce_mapG :> mapperG Σ A;
map_reduce_reduceG :> mapperG Σ (Z * list B);
map_reduce_mapG :> mapG Σ A;
map_reduce_reduceG :> mapG Σ (Z * list B);
}.
Section mapper.
Context `{Countable A, Countable B} {C : Type}.
Context `{!heapG Σ, !proto_chanG Σ, !lockG Σ, !map_reduceG Σ A B} (N : namespace).
Context `{!heapG Σ, !proto_chanG Σ, !map_reduceG Σ A B} (N : namespace).
Context (IA : A val iProp Σ) (IB : Z B val iProp Σ) (IC : C val iProp Σ).
Context (map : A list (Z * B)) (red : Z list B list C).
Context `{!∀ j, Proper (() ==> ()) (red j)}.
Local Open Scope nat_scope.
Implicit Types n : nat.
Definition IZB (iy : Z * B) (w : val) : iProp Σ :=
( w', w = (#iy.1, w')%V IB iy.1 iy.2 w')%I.
......@@ -236,14 +235,14 @@ Section mapper.
repeat case_bool_decide=> //; unfold RZB, prod_relation in *; naive_solver.
Qed.
Lemma map_reduce_service_map_loop_spec (n : nat) cmap csort vs xs X ys :
Lemma par_map_reduce_map_server_spec n cmap csort vs xs X ys :
(n = 0 X = xs = [])
{{{
cmap mapper_protocol IA IZB map n (X : gmultiset A) @ N
cmap map_worker_protocol IA IZB map n (X : gmultiset A) @ N
csort sort_elem_head_protocol IZB RZB ys @ N
([ list] x;v xs;vs, IA x v)
}}}
map_reduce_service_map_loop #n cmap csort (llist vs)
par_map_reduce_map_server #n cmap csort (llist vs)
{{{ ys', RET #();
ys' (xs ++ elements X) ≫= map
csort sort_elem_head_protocol IZB RZB (ys ++ ys') @ N
......@@ -280,7 +279,7 @@ Section mapper.
by rewrite gmultiset_elements_singleton assoc_L bind_app -Hys /= right_id_L comm.
Qed.
Lemma map_reduce_service_recv_same_key_spec csort iys iys_sorted i ys ws :
Lemma par_map_reduce_collect_spec csort iys iys_sorted i ys ws :
let acc := from_option (λ '(i,y,w), [(i,y)]) [] in
let accv := from_option (λ '(i,y,w), SOMEV (#(i:Z),w)) NONEV in
ys []
......@@ -290,7 +289,7 @@ Section mapper.
csort sort_elem_tail_protocol IZB RZB iys (iys_sorted ++ ((i,) <$> ys)) @ N
[ list] y;w ys;ws, IB i y w
}}}
map_reduce_service_recv_same_key csort #i (llist (reverse ws))
par_map_reduce_collect csort #i (llist (reverse ws))
{{{ ys' ws' miy, RET ((#i,llist (reverse ws')),accv miy);
Sorted RZB ((iys_sorted ++ ((i,) <$> ys ++ ys')) ++ acc miy)
from_option (λ '(j,_,_), i j j iys_sorted.*1)
......@@ -331,7 +330,7 @@ Section mapper.
eapply elem_of_StronglySorted_app; set_solver.
Qed.
Lemma mapper_service_loop_spec (n : nat) iys iys_sorted miy zs ws Y csort cred :
Lemma par_map_reduce_reduce_server_spec n iys iys_sorted miy zs ws Y csort cred :
let acc := from_option (λ '(i,y,w), [(i,y)]) [] in
let accv := from_option (λ '(i,y,w), SOMEV (#(i:Z),w)) NONEV in
(n = 0 miy = None Y = )
......@@ -340,11 +339,11 @@ Section mapper.
{{{
csort from_option (λ _, sort_elem_tail_protocol IZB RZB iys
(iys_sorted ++ acc miy)) END%proto miy @ N
cred mapper_protocol IZBs IC (curry red) n (Y : gmultiset (Z * list B)) @ N
cred map_worker_protocol IZBs IC (curry red) n (Y : gmultiset (Z * list B)) @ N
from_option (λ '(i,y,w), IB i y w) True miy
([ list] z;w zs;ws, IC z w)
}}}
map_reduce_service_reduce_loop #n csort cred (accv miy) (llist ws)
par_map_reduce_reduce_server #n csort cred (accv miy) (llist ws)
{{{ zs' ws', RET (llist ws');
(group iys_sorted ≫= curry red) ++ zs' (group iys ++ elements Y) ≫= curry red
[ list] z;w zs' ++ zs;ws', IC z w
......@@ -363,7 +362,7 @@ Section mapper.
with "[%] [%] [%] Hcsort Hcred [] HIC HΦ"); naive_solver.
+ wp_apply (lnil_spec with "[//]"); iIntros (_).
wp_apply (lcons_spec with "[//]"); iIntros (_).
wp_apply (map_reduce_service_recv_same_key_spec _ _ _ _ [_] [_]
wp_apply (par_map_reduce_collect_spec _ _ _ _ [_] [_]
with "[$Hcsort HIB]"); try done.
{ simpl; iFrame. }
iIntros (ys' ws'' miy).
......@@ -393,60 +392,34 @@ Section mapper.
by rewrite right_id_L !assoc_L.
Qed.
Definition map_spec (vmap : val) : iProp Σ := ( x v,
{{{ IA x v }}}
vmap v
{{{ ws, RET (llist ws); [ list] iy;w map x;ws, IZB iy w }}})%I.
Definition red_spec (vred : val) : iProp Σ := ( i ys vs,
{{{ [ list] y;v ys;vs, IB i y v }}}
vred (#i, llist vs)%V
{{{ ws, RET (llist ws); [ list] z;w red i ys;ws, IC z w }}})%I.
Lemma map_reduce_service_spec (n : nat) vmap vred vs xs :
Lemma par_map_reduce_spec n vmap vred vs xs :
0 < n
map_spec vmap -∗
red_spec vred -∗
map_spec IA IZB map vmap -∗
map_spec IZBs IC (curry red) vred -∗
{{{ [ list] x;v xs;vs, IA x v }}}
map_reduce_service #n vmap vred (llist vs)
par_map_reduce #n vmap vred (llist vs)
{{{ zs ws, RET (llist ws);
zs map_reduce map red xs [ list] z;w zs;ws, IC z w
}}}.
Proof.
iIntros (?) "#Hmap #Hred !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures.
wp_apply (new_lock_spec N with "[//]"); iIntros (lkmap) "[Hu Hlk]".
wp_apply (start_chan_proto_spec N (mapper_protocol IA IZB map n )
with "[Hu Hlk]"); try iNext; iIntros (cmap) "Hcmap".
{ wp_lam.
iMod (contribution_init_pow (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]".
iMod ("Hlk" $! (mapper_lock_inv N IA IZB map γ cmap) with "[Hcmap Hs]") as "#Hlk".
{ iExists n, ∅. iFrame. }
wp_apply (start_mappers_spec with "Hmap [$Hlk $Hu $Hγs]"); auto. }
wp_apply (start_map_service_spec with "Hmap [//]"); iIntros (cmap) "Hcmap".
wp_apply (start_chan_proto_spec N (sort_elem_protocol IZB RZB <++> END)%proto);
iIntros (csort) "Hcsort".
{ wp_apply (sort_elem_service_spec N with "[] Hcsort"); last by auto.
iApply RZB_cmp_spec. }
rewrite right_id.
wp_apply (map_reduce_service_map_loop_spec with "[$Hcmap $Hcsort $HI]"); first lia.
wp_apply (par_map_reduce_map_server_spec with "[$Hcmap $Hcsort $HI]"); first lia.
iIntros (iys). rewrite gmultiset_elements_empty right_id_L.
iDestruct 1 as (Hiys) "Hcsort /=". wp_select; simpl.
wp_apply (new_lock_spec N with "[//]"); iIntros (lkred) "[Hu Hlk]".
wp_apply (start_chan_proto_spec N (mapper_protocol IZBs IC (curry red) n )
with "[Hu Hlk]"); try iNext; iIntros (cred) "Hcred".
{ wp_lam.
iMod (contribution_init_pow (A:=gmultisetUR (Z * list B)) n) as (γ) "[Hs Hγs]".
iMod ("Hlk" $! (mapper_lock_inv N IZBs IC (curry red) γ cred)
with "[Hcred Hs]") as "#Hlk".
{ iExists n, ∅. iFrame. }
wp_apply (start_mappers_spec with "[Hred] [$Hlk $Hu $Hγs]"); last by auto.
iIntros ([i ys] v) "!>"; iIntros (Ψ); iDestruct 1 as (ws ->) "HIB".
iIntros "HΦ /=". iApply ("Hred" with "HIB HΦ"). }
iDestruct 1 as (Hiys) "Hcsort /=". wp_select; wp_pures; simpl.
wp_apply (start_map_service_spec with "Hred [//]"); iIntros (cred) "Hcred".
wp_branch as %_|%Hnil; last first.
{ wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_).
iApply ("HΦ" $! []); simpl. iSplit; [iPureIntro|done].
by rewrite /map_reduce /= -Hiys -Hnil. }
wp_recv ([i y] ?) as (_ w ->) "HIB /="; wp_pures.
wp_apply (lnil_spec with "[//]"); iIntros (_).
wp_apply (mapper_service_loop_spec _ _ [] (Some (i, y, w)) [] []
wp_apply (par_map_reduce_reduce_server_spec _ _ [] (Some (i, y, w)) [] []
with "[$Hcsort $Hcred $HIB]"); simpl; auto; [lia|set_solver|].
iIntros (zs ws). rewrite /= gmultiset_elements_empty !right_id.
iDestruct 1 as (Hzs) "HIC". iApply ("HΦ" with "[$HIC]"). by rewrite Hzs Hiys.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment