Commit adb7d112 authored by Robbert Krebbers's avatar Robbert Krebbers

Distributed mapper.

parent 46799c39
From stdpp Require Import sorting.
From osiris.channel Require Import proto_channel proofmode.
From iris.heap_lang Require Import proofmode notation.
From iris.heap_lang Require Import assert.
From osiris.utils Require Import list compare spin_lock contribution.
From osiris.examples Require Import list_sort_elem.
From iris.algebra Require Import gmultiset.
Definition mapper : val :=
rec: "go" "f" "l" "c" :=
acquire "l";;
send "c" #true;;
if: ~recv "c" then release "l" else
let: "x" := recv "c" in
release "l";;
let: "y" := "f" "x" in
acquire "l";;
send "c" #false;;
send "c" "y";;
release "l";;
"go" "f" "l" "c".
Definition start_mappers : val :=
rec: "go" "n" "f" "l" "c" :=
if: "n" = #0 then #() else
Fork (mapper "f" "l" "c");;
"go" ("n" - #1) "f" "l" "c".
Definition loop_mappers : val :=
rec: "go" "n" "c" "xs" "ys" :=
if: "n" = #0 then "ys" else
if: recv "c" then
if: lisnil "xs" then
send "c" #false;;
"go" ("n" - #1) "c" "xs" "ys"
else
send "c" #true;;
send "c" (lhead "xs");;
"go" "n" "c" (ltail "xs") "ys"
else
let: "y" := recv "c" in
"go" "n" "c" "xs" (lcons "y" "ys").
Definition mapper_service : val := λ: "n" "f" "xs",
let: "l" := new_lock #() in
let: "c" := start_chan (λ: "c",
start_mappers "n" "f" "l" "c") in
loop_mappers "n" "c" "xs" (lnil #()).
Class map_sortG Σ A `{Countable A} := {
map_sort_contributionG :> contributionG Σ (gmultisetUR A);
map_sort_lockG :> lockG Σ;
}.
Section mapper.
Context `{Countable A, Countable B}.
Context `{!heapG Σ, !proto_chanG Σ, map_sortG Σ A} (N : namespace).
Context (IA : A val iProp Σ) (IB : B val iProp Σ) (f : A B).
Local Open Scope nat_scope.
Definition mapper_protocol_aux (rec : nat -d> gmultiset A -d> iProto Σ) :
nat -d> gmultiset A -d> iProto Σ := λ i X,
let rec : nat gmultiset A iProto Σ := rec in
(if i is 0 then END else
((<!> x v, MSG v {{ IA x v }}; rec i (X {[ x ]}))
<+>
rec (pred i) X
) <{ i 1 X = }&>
<?> x w, MSG w {{ x X IB (f x) w }}; rec i (X {[ x ]}))%proto.
Instance mapper_protocol_aux_contractive : Contractive mapper_protocol_aux.
Proof. solve_proper_prepare. f_equiv. solve_proto_contractive. Qed.
Definition mapper_protocol := fixpoint mapper_protocol_aux.
Global Instance mapper_protocol_unfold n X :
ProtoUnfold (mapper_protocol n X) (mapper_protocol_aux mapper_protocol n X).
Proof. apply proto_unfold_eq, (fixpoint_unfold mapper_protocol_aux). Qed.
Definition mapper_lock_inv (γ : gname) (c : val) : iProp Σ :=
( i X, server γ i X c iProto_dual (mapper_protocol i X) @ N)%I.
Lemma mapper_spec γ (ff : val) lk c q :
( x v, {{{ IA x v }}} ff v {{{ w, RET w; IB (f x) w }}}) -
{{{ is_lock N lk (mapper_lock_inv γ c)
unlocked N lk q client γ ( : gmultiset A) }}}
mapper ff #lk c
{{{ RET #(); True }}}.
Proof.
iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγ) HΦ". iLöb as "IH".
wp_rec; wp_pures.
wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]".
iDestruct "H" as (i X) "[Hs Hc]".
iDestruct (@server_agree with "Hs Hγ") as %[??]; destruct i as [|i]=>//=.
iAssert S i 1 X = %I as %?.
{ destruct i as [|i]; last auto with lia.
iDestruct (@server_1_agree with "Hs Hγ") as %?%leibniz_equiv; auto. }
wp_select. wp_branch; wp_pures; last first.
{ iMod (@dealloc_client with "Hs Hγ") as "Hs /=".
wp_apply (release_spec with "[$Hlk $Hl Hc Hs]").
{ iExists i, _. iFrame. }
iIntros "_". by iApply "HΦ". }
wp_recv (x v) as "HI".
iMod (@update_client with "Hs Hγ") as "[Hs Hγ]".
{ apply (gmultiset_local_update_alloc _ _ {[ x ]}). }
rewrite left_id_L.
wp_apply (release_spec with "[$Hlk $Hl Hc Hs]").
{ iExists (S i), _. iFrame. }
clear dependent i X. iIntros "Hu". wp_apply ("Hf" with "HI"); iIntros (w) "HI".
wp_apply (acquire_spec with "[$Hlk $Hu]"); iIntros "[Hl H]".
iDestruct "H" as (i X) "[Hs Hc]".
iDestruct (@server_agree with "Hs Hγ")
as %[??%gmultiset_included]; destruct i as [|i]=>//=.
wp_select. wp_send with "[$HI]".
{ by rewrite gmultiset_elem_of_singleton_subseteq. }
iMod (@update_client with "Hs Hγ") as "[Hs Hγ]".
{ by apply (gmultiset_local_update_dealloc _ _ {[ x ]}). }
rewrite gmultiset_difference_diag.
wp_apply (release_spec with "[$Hlk $Hl Hc Hs]").
{ iExists (S i), _. iFrame. }
iIntros "Hu". by wp_apply ("IH" with "[$] [$]").
Qed.
Lemma start_mappers_spec γ (n : nat) (ff : val) lk c q :
( x v, {{{ IA x v }}} ff v {{{ w, RET w; IB (f x) w }}}) -
{{{ is_lock N lk (mapper_lock_inv γ c) unlocked N lk q
n * client γ (:gmultiset A) }}}
start_mappers #n ff #lk c
{{{ RET #(); True }}}.
Proof.
iIntros "#Hf !>" (Φ) "(#Hlk & Hu & Hγs) HΦ".
iInduction n as [|n] "IH" forall (q); wp_rec; wp_pures; simpl.
{ by iApply "HΦ". }
iDestruct "Hγs" as "[Hγ Hγs]"; iDestruct "Hu" as "[Hu Hu']".
wp_apply (wp_fork with "[Hγ Hu]").
{ iNext. wp_apply (mapper_spec with "Hf [$]"); auto. }
wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ.
wp_apply ("IH" with "[$] [$] [$]").
Qed.
Lemma loop_mappers_spec (n : nat) c vs xs ws X_send xs_recv :
(n = 0 X_send = xs = [])
{{{
c mapper_protocol n (X_send : gmultiset A) @ N
([ list] x;v xs;vs, IA x v) ([ list] x;w xs_recv;ws, IB (f x) w)
}}}
loop_mappers #n c (val_encode vs) (val_encode ws)
{{{ ys ws, RET (val_encode ws);
ys ++ (f <$> xs_recv) f <$> (elements X_send ++ xs ++ xs_recv)
[ list] y;w ys ++ (f <$> xs_recv);ws, IB y w
}}}.
Proof.
iIntros (Hn Φ) "(Hc & Hxs & Hxs_recv) HΦ".
iLöb as "IH" forall (n vs xs ws X_send xs_recv Hn Φ); wp_rec; wp_pures; simpl.
case_bool_decide; wp_pures; simplify_eq/=.
{ destruct Hn as [-> ->]; first lia.
iApply ("HΦ" $! []); simpl. rewrite big_sepL2_fmap_l. auto. }
destruct n as [|n]=> //=. wp_branch as %?|%_; wp_pures.
- wp_apply (lisnil_spec (A:=val) with "[//]"); iIntros (_).
destruct vs as [|v vs], xs as [|x xs]; csimpl; try done; wp_pures.
+ wp_select. wp_pures. rewrite Nat2Z.inj_succ Z.sub_1_r Z.pred_succ.
iApply ("IH" with "[] Hc [//] [$] HΦ"). iPureIntro; naive_solver.
+ iDestruct "Hxs" as "[Hx Hxs]". wp_select.
wp_apply (lhead_spec (A:=val) with "[//]"); iIntros (_).
wp_send with "[$Hx]".
wp_apply (ltail_spec (A:=val) with "[//]"); iIntros (_).
wp_apply ("IH" with "[] Hc Hxs Hxs_recv"); first done.
iIntros (ys ws').
rewrite gmultiset_elements_disj_union gmultiset_elements_singleton -!assoc_L.
iApply "HΦ".
- wp_recv (x w) as (HH) "HIfx".
wp_apply (lcons_spec (A:=val) with "[//]"); iIntros (_).
wp_apply ("IH" $! _ _ _ _ _ (_ :: _) with "[] Hc Hxs [HIfx Hxs_recv]"); first done.
{ simpl; iFrame. }
iIntros (ys ws'); iDestruct 1 as (Hys) "H"; simplify_eq/=.
iApply ("HΦ" $! (ys ++ [f x])). iSplit.
+ iPureIntro. rewrite -assoc_L /= Hys.
rewrite {2}(gmultiset_disj_union_difference {[ x ]} X_send)
-?gmultiset_elem_of_singleton_subseteq //.
rewrite (comm disj_union) gmultiset_elements_disj_union.
rewrite gmultiset_elements_singleton.
by rewrite -assoc_L (assoc_L _ [x]) (comm _ [x]) -!assoc_L.
+ by rewrite -assoc_L.
Qed.
Lemma mapper_service_spec (n : nat) (ff : val) vs xs :
0 < n
( x v, {{{ IA x v }}} ff v {{{ w, RET w; IB (f x) w }}}) -
{{{ [ list] x;v xs;vs, IA x v }}}
mapper_service #n ff (val_encode vs)
{{{ ys ws, RET (val_encode ws); ys f <$> xs
[ list] y;w ys;ws, IB y w }}}.
Proof.
iIntros (?) "#Hf !>"; iIntros (Φ) "HI HΦ". wp_lam; wp_pures.
wp_apply (new_lock_spec N with "[//]"); iIntros (lk) "[Hu Hlk]".
wp_apply (start_chan_proto_spec N (mapper_protocol n ) with "[Hu Hlk]");
try iNext; iIntros (c) "Hc".
{ wp_lam.
iMod (contribution_initN (A:=gmultisetUR A) n) as (γ) "[Hs Hγs]".
iMod ("Hlk" $! (mapper_lock_inv γ c) with "[Hc Hs]") as "#Hlk".
{ iExists n, . iFrame. }
wp_apply (start_mappers_spec with "Hf [$Hlk $Hu $Hγs]"); auto. }
wp_pures. wp_apply (lnil_spec with "[//]"); iIntros (_).
wp_apply (loop_mappers_spec _ _ _ _ [] [] with "[$Hc $HI //]"); first lia.
iIntros (ys ws). rewrite /= !right_id_L gmultiset_elements_empty. iApply "HΦ".
Qed.
End mapper.
From iris.base_logic Require Export base_logic lib.iprop lib.own.
From iris.proofmode Require Export tactics.
From iris.algebra Require Import excl auth csum gmultiset frac_auth.
From iris.algebra Require Export local_updates.
Class contributionG Σ (A : ucmraT) `{!CmraDiscrete A} := {
contribution_inG :> inG Σ
......@@ -19,6 +20,12 @@ Definition client `{contributionG Σ A} (γ : gname) (x : A) : iProp Σ :=
Typeclasses Opaque client.
Instance: Params (@client) 5.
(** MOVE *)
Fixpoint bi_mult {PROP : bi} (n : nat) (P : PROP) : PROP :=
match n with O => emp | S n => P bi_mult n P end%I.
Arguments bi_mult {_} _ _%I.
Notation "n * P" := (bi_mult n P) : bi_scope.
Section contribution.
Context `{contributionG Σ A}.
Implicit Types x y : A.
......@@ -77,11 +84,10 @@ Section contribution.
by destruct n.
Qed.
Lemma alloc_client γ n x x' y' :
(x,ε) ~l~> (x',y')
server γ n x == server γ (S n) x' client γ y'.
Lemma alloc_client γ n x :
server γ n x == server γ (S n) x client γ ε.
Proof.
intros Hup. rewrite /server /client.
rewrite /server /client.
destruct (decide (n = 0)) as [->|?]; case_decide; try done.
- iDestruct 1 as (Hx) "[Hs Hc]"; setoid_subst.
iMod (own_update_2 with "Hs Hc") as "[$ $]"; last done.
......@@ -99,28 +105,24 @@ Section contribution.
by apply option_local_update, csum_local_update_l, prod_local_update_2.
Qed.
Lemma dealloc_client γ n x y :
Cancelable y
server γ n (x y) - client γ y == server γ (pred n) x.
Lemma dealloc_client γ n x :
server γ n x - client γ ε == server γ (pred n) x.
Proof.
iIntros (?) "Hs Hc". iDestruct (server_valid with "Hs") as %Hv.
iIntros "Hs Hc". iDestruct (server_valid with "Hs") as %Hv.
destruct (decide (n = 1)) as [->|]; simpl.
- iDestruct (server_1_agree with "Hs Hc") as %Hxy.
move: Hxy. rewrite {1}(comm _ x) -{2}(right_id ε _ y)=> /cancelable.
rewrite {1}comm=> /(_ Hv) ->. rewrite left_id.
- iDestruct (server_1_agree with "Hs Hc") as %->.
rewrite /server /client; repeat case_decide=> //.
iMod (own_update_2 with "Hs Hc") as "[$ $]"; last done.
by apply auth_update, option_local_update, (replace_local_update _ _).
- iDestruct (server_agree with "Hs Hc") as %[? [z Hxy]].
move: Hxy. rewrite (comm _ x)=> /cancelable.
rewrite {1}comm=> /(_ Hv) ->.
- iDestruct (server_agree with "Hs Hc") as %[? [z ->]].
rewrite /server /client. destruct n as [|[|n]]; case_decide=>//=.
iApply (own_update_2 with "Hs Hc"). apply auth_update_dealloc.
rewrite -(right_id _ _ (Some (Cinl (1%positive, _)))).
rewrite Nat2Pos.inj_succ // -Pos.add_1_l.
rewrite -pair_op -Cinl_op Some_op. apply (cancel_local_update _ _ _).
rewrite -pair_op -Cinl_op Some_op left_id. apply (cancel_local_update _ _ _).
Qed.
Lemma update_client γ n x y x' y' :
(x,y) ~l~> (x',y')
server γ n x - client γ y == server γ n x' client γ y'.
......@@ -137,4 +139,12 @@ Section contribution.
by apply auth_update, option_local_update,
csum_local_update_l, prod_local_update_2.
Qed.
(** Derived *)
Lemma contribution_initN n : (|==> γ, server γ n ε n * client γ ε)%I.
Proof.
iMod (contribution_init) as (γ) "Hs". iExists γ.
iInduction n as [|n] "IH"; first by iFrame.
iMod ("IH" with "Hs") as "[Hs $]". by iApply alloc_client.
Qed.
End contribution.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment