`
mryufeng
  • 浏览: 968451 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Erlang: 当你net_adm:ping(Node)的时候发生了什么?流程复杂 但是对理解dist工作原理至关重要

阅读更多
当你net_adm:ping(Node)的时候发生了什么? 这个涉及到很复杂的流程。让我为你解刨:
这个流程很长而且在erlang代码和c代码里面窜来窜去,重要的点 我用红字标注 请各位耐心。

1. net_adm.erl:
ping(Node) when is_atom(Node) ->
    case catch gen:call({net_kernel, Node},
            '$gen_call',
            {is_auth, node()},
            infinity) of
    {ok, yes} -> pong;
    _ ->
        erlang:disconnect_node(Node),
        pang
    end.

2. gen.erl:
%% Remote by name
call({_Name, Node}=Process, Label, Request, Timeout)
  when is_atom(Node), Timeout =:= infinity;
       is_atom(Node), is_integer(Timeout), Timeout >= 0 ->
    if
     node() =:= nonode@nohost ->
         exit({nodedown, Node});
     true ->
         do_call(Process, Label, Request, Timeout)
    end.

do_call(Process, Label, Request, Timeout) ->
    %% We trust the arguments to be correct, i.e
    %% Process is either a local or remote pid,
    %% or a {Name, Node} tuple (of atoms) and in this
    %% case this node (node()) _is_ distributed and Node =/= node().
    Node = case Process of
           {_S, N} ->
           N;
           _ when is_pid(Process) ->
           node(Process);
           _ ->
           node()
       end,
    case catch erlang:monitor(process, Process) of
    Mref when is_reference(Mref) ->
        receive
        {'DOWN', Mref, _, Pid1, noconnection} when is_pid(Pid1) ->
            exit({nodedown, node(Pid1)});
        {'DOWN', Mref, _, _, noconnection} ->
            exit({nodedown, Node});
        {'DOWN', Mref, _, _, _} ->
            exit(noproc)
        after 0 ->
            Process ! {Label, {self(), Mref}, Request},
            wait_resp_mon(Process, Mref, Timeout)
        end;
    {'EXIT', _} ->
        %% Old node is not supporting the monitor.
        %% The other possible case -- this node is not distributed
        %% -- should have been handled earlier.
        %% Do the best possible with monitor_node/2.
        %% This code may hang indefinitely if the Process
        %% does not exist. It is only used for old remote nodes.
        monitor_node(Node, true),
        receive
        {nodedown, Node} ->
            monitor_node(Node, false),
            exit({nodedown, Node})
        after 0 ->
            Mref = make_ref(),
            Process ! {Label, {self(),Mref}, Request},   
            Res = wait_resp(Node, Mref, Timeout),
            monitor_node(Node, false),
            Res
        end
    end.

3 . Process ! {Label, {self(),Mref}, Request},     相当于erlang:send(Process,  {Label, {self(),Mref}, Request});

4. bif.tab
 bif 'erl.lang.proc':send/2    ebif_send_2 send_2 bif erlang:send/2

5. bif.c
Eterm send_2(Process *p, Eterm to, Eterm msg) {
    Sint result = do_send(p, to, msg, !0);
   
    if (result > 0) {
    BUMP_REDS(p, result);
    BIF_RET(msg);
    } else switch (result) {
    case 0:
    BIF_RET(msg);
    break;
    case SEND_TRAP:
    BIF_TRAP2(dsend2_trap, p, to, msg);
    break;
    case SEND_RESCHEDULE:
    BIF_ERROR(p, RESCHEDULE);
    break;
    case SEND_BADARG:
    BIF_ERROR(p, BADARG);
    break;
    case SEND_USER_ERROR:
    BIF_ERROR(p, EXC_ERROR);
    break;
    default:
    ASSERT(! "Illegal send result");
    break;
    }
    ASSERT(! "Can not arrive here");
    BIF_ERROR(p, BADARG);
}

6. bif.c
 Sint
do_send(Process *p, Eterm to, Eterm msg, int suspend) {
    Eterm portid;
    Port *pt;
    Process* rp;
    DistEntry *dep;
    Eterm* tp;

    if (is_internal_pid(to)) {
    if (IS_TRACED(p))
        trace_send(p, to, msg);
    if (p->ct != NULL)
       save_calls(p, &exp_send);
   
    if (internal_pid_index(to) >= erts_max_processes)
        return SEND_BADARG;
    rp = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN, to, ERTS_PROC_LOCKS_MSG_SEND);
   
    if (!rp) {
        ERTS_SMP_ASSERT_IS_NOT_EXITING(p);
        return 0;
    }
    } else if (is_external_pid(to)) {
    Sint res;
    dep = external_pid_dist_entry(to);
    if(dep == erts_this_dist_entry) {
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
        erts_dsprintf(dsbufp,
              "Discarding message %T from %T to %T in an old "
              "incarnation (%d) of this node (%d)\n",
              msg,
              p->id,
              to,
              external_pid_creation(to),
              erts_this_node->creation);
        erts_send_error_to_logger(p->group_leader, dsbufp);
        return 0;
    }

    erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);

    /* Send to remote process */
    if (is_nil(dep->cid))
        res = SEND_TRAP;
    else if (dist_send(p, ERTS_PROC_LOCK_MAIN, dep, to, msg) == 1) {

        if (is_internal_port(dep->cid)) {
        if (suspend) {
            erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
            if (erts_system_monitor_flags.busy_dist_port) {
            monitor_generic(p, am_busy_dist_port, dep->cid);
            }
        }
        res = SEND_RESCHEDULE;
        }
        else {
        res = SEND_TRAP;
        }
    }
    else {
        res = 50;
        if (IS_TRACED(p))
        trace_send(p, to, msg);
        if (p->ct != NULL)
        save_calls(p, &exp_send);
    }
   
    erts_dist_op_finalize(dep);

    return res;
    } else if (is_atom(to)) {
    erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
              to,
              &rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
              &pt);

    if (pt) {
        portid = pt->id;
        goto port_common;
    }

    if (IS_TRACED(p))
        trace_send(p, to, msg);
    if (p->ct != NULL)
       save_calls(p, &exp_send);
   
    if (!rp) {
        return SEND_BADARG;
    }
    } else if (is_external_port(to)
           && (external_port_dist_entry(to)
           == erts_this_dist_entry)) {
    erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
    erts_dsprintf(dsbufp,
              "Discarding message %T from %T to %T in an old "
              "incarnation (%d) of this node (%d)\n",
              msg,
              p->id,
              to,
              external_port_creation(to),
              erts_this_node->creation);
    erts_send_error_to_logger(p->group_leader, dsbufp);
    return 0;
    } else if (is_internal_port(to)) {
    portid = to;
    pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN);
      port_common:
    ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt));
    /* XXX let port_command handle the busy stuff !!! */
    if (pt && (pt->status & ERTS_PORT_S_PORT_BUSY)) {
        if (suspend) {
        erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
        if (erts_system_monitor_flags.busy_port) {
            monitor_generic(p, am_busy_port, portid);
        }
        }
        erts_port_release(pt);
        return SEND_RESCHEDULE;
    }
   
    if (IS_TRACED(p))     /* trace once only !! */
        trace_send(p, portid, msg);
    if (p->ct != NULL)
       save_calls(p, &exp_send);
   
    if (SEQ_TRACE_TOKEN(p) != NIL) {
        seq_trace_update_send(p);
        seq_trace_output(SEQ_TRACE_TOKEN(p), msg,
                 SEQ_TRACE_SEND, portid, p);
    }       
   
    /* XXX NO GC in port command */
    erts_port_command(p, p->id, pt, msg);
    if (pt)
        erts_port_release(pt);
    if (ERTS_PROC_IS_EXITING(p)) {
        KILL_CATCHES(p); /* Must exit */
        return SEND_USER_ERROR;
    }
    return 0;
    } else if (is_tuple(to)) { /* Remote send */
    int ret;
    tp = tuple_val(to);
    if (*tp != make_arityval(2))
        return SEND_BADARG;
    if (is_not_atom(tp[1]) || is_not_atom(tp[2]))
        return SEND_BADARG;
   
    /* sysname_to_connected_dist_entry will return NULL if there
       is no dist_entry or the dist_entry has no port*/
    if ((dep = erts_sysname_to_connected_dist_entry(tp[2])) == NULL) {
        return SEND_TRAP;
    }

    if (dep == erts_this_dist_entry) {
        erts_deref_dist_entry(dep);
        if (IS_TRACED(p))
        trace_send(p, to, msg);
        if (p->ct != NULL)
           save_calls(p, &exp_send);

        erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
                  tp[1],
                  &rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
                  &pt);
        if (pt) {
        portid = pt->id;
        goto port_common;
        }

        if (!rp) {
        return 0;
        }
        goto send_message;
    }

    erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);
    if (is_nil(dep->cid))
        ret = SEND_TRAP;
    else if (dist_reg_send(p, ERTS_PROC_LOCK_MAIN, dep, tp[1], msg) == 1) {
        if (is_internal_port(dep->cid)) {
        if (suspend) {
            erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
            if (erts_system_monitor_flags.busy_dist_port) {
            monitor_generic(p, am_busy_dist_port, dep->cid);
            }
        }
        ret = SEND_RESCHEDULE;
        }
        else {
        ret = SEND_TRAP;
        }

    }
    else {
        ret = 0;
        if (IS_TRACED(p))
        trace_send(p, to, msg);
        if (p->ct != NULL)
        save_calls(p, &exp_send);
    }

    erts_dist_op_finalize(dep);
    erts_deref_dist_entry(dep);
    return ret;
    } else {
    if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */
        trace_send(p, to, msg);
    if (p->ct != NULL)
       save_calls(p, &exp_send);
    return SEND_BADARG;
    }
   
 send_message: {
    Uint32 rp_locks = ERTS_PROC_LOCKS_MSG_SEND;
    Sint res;
#ifdef ERTS_SMP
    if (p == rp)
        rp_locks |= ERTS_PROC_LOCK_MAIN;
#endif
    /* send to local process */
    erts_send_message(p, rp, &rp_locks, msg, 0);
#ifdef ERTS_SMP
    res = rp->msg_inq.len*4;
    if (ERTS_PROC_LOCK_MAIN & rp_locks)
        res += rp->msg.len*4;
#else
    res = rp->msg.len*4;
#endif
    erts_smp_proc_unlock(rp,
                 p == rp
                 ? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
                 : rp_locks);
    return res;
    }
}

8. 如果能找到节点的话 就调用dist_send  否者发生SEND_TRAP

9. dist.c


void init_dist(void)
{
    init_alive();
    init_nodes_monitors();

    no_caches = 0;

    /* Lookup/Install all references to trap functions */
    dsend2_trap = trap_function(am_dsend,2);
    dsend3_trap = trap_function(am_dsend,3);
    /*    dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
    dlink_trap = trap_function(am_dlink,1);
    dunlink_trap = trap_function(am_dunlink,1);
    dmonitor_node_trap = trap_function(am_dmonitor_node,3);
    dgroup_leader_trap = trap_function(am_dgroup_leader,2);
    dexit_trap = trap_function(am_dexit, 2);
    dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
}

static Export*
trap_function(Eterm func, int arity)
{
    return erts_export_put(am_erlang, func, arity);
}
也就是说dsend2_trap 就是erlang:dsend这个函数

当send 失败 的时候 参考5 的 send_2   将执行 BIF_TRAP2(dsend2_trap, p, to, msg);

10。 bif.h
#define BIF_TRAP2(Trap_, p, A0, A1) do {    \
      (p)->arity = 2;                \
      (p)->def_arg_reg[0] = (A0);        \
      (p)->def_arg_reg[1] = (A1);        \
      (p)->def_arg_reg[3] = (Eterm) (Trap_);    \
      (p)->freason = TRAP;            \
      return THE_NON_VALUE;            \
 } while(0)

11. beam_emu.c

 OpCase(call_bif2_e):
    {
    Eterm (*bf)(Process*, Eterm, Eterm, Uint*) = GET_BIF_ADDRESS(Arg(0));
    Eterm result;
    Eterm* next;

    SWAPOUT;
    c_p->fcalls = FCALLS - 1;
    if (FCALLS <= 0) {
       save_calls(c_p, (Export *) Arg(0));
    }
    PreFetch(1, next);
    CHECK_TERM(r(0));
    CHECK_TERM(x(1));
    PROCESS_MAIN_CHK_LOCKS(c_p);
    ASSERT(!ERTS_PROC_IS_EXITING(c_p));
    result = (*bf)(c_p, r(0), x(1), I);
    ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
    PROCESS_MAIN_CHK_LOCKS(c_p);
    ERTS_HOLE_CHECK(c_p);
    POST_BIF_GC_SWAPIN(c_p, result);
    FCALLS = c_p->fcalls;
    if (is_value(result)) {
        r(0) = result;
        CHECK_TERM(r(0));
        NextPF(1, next);
    } else if (c_p->freason == RESCHEDULE) {
        c_p->arity = 2;
        goto suspend_bif;
    } else if (c_p->freason == TRAP) {
        goto call_bif_trap3;
    }

    call_bif_trap3:
        SET_CP(c_p, I+2);
        SET_I(((Export *)(c_p->def_arg_reg[3]))->address);
        SWAPIN;
        r(0) = c_p->def_arg_reg[0];
        x(1) = c_p->def_arg_reg[1];
        x(2) = c_p->def_arg_reg[2];
        Dispatch();
也就是说这时候trap erlang:dsend函数被调用。

12. erlang.erl

dsend({Name, Node}, Msg, Opts) ->
    case net_kernel:connect(Node) of
    true -> erlang:send({Name,Node}, Msg, Opts);
    false -> ok;
    ignored -> ok                % Not distributed.
    end.

先链接对方节点 成功 发数据包 转8 实际上是调用dist_send.
整个流程完毕。

这里面trap技术用的很巧妙。

net_kernel:connect 也很复杂 另文分析,请期待。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics