LING — the Erlang/OTP compatible virtual machine is able to boot in 50ms under no OS within Xen hypervisor or raw hardware. This chapter will guide you through C interfaces of the LING internals. The purpose of this document is to deep inside LING as fast as possible.
The reference to the process that created the timer if msg is a pointer term or 0 if msg is not. The reference is needed to update pending_timers count of the sending process and and destroy it if is a zombie already. The fire function that is called when the timer expire. It is introduced to use the same code for TCP/IP and potentially other timers. When enveloped is true — send {timeout,TRef,Msg} message, false — send just Msg.
int etimer_add(uint64_t ref_id,
uint64_t timeout,
term_t dst, term_t msg,
proc_t *sender,
int enveloped);
uint64_t etimer_closest_timeout(void);
void etimer_expired(uint64_t now);
int etimer_cancel(uint64_t ref_id,
int64_t *left_ns);
void etimer_cancel_by_receiver(term_t dst);
int etimer_read(uint64_t ref_id,
int64_t *left_ns);
void etimer_fill_root_regs(proc_t *sender,
region_t *regs,
int nr_expected);
typedef int (*etimer_func_t)(etimer_t *tm);
struct etimer_t {
uint64_t ref_id;
uint64_t timeout;
term_t dst;
term_t msg;
int enveloped;
proc_t *sender;
etimer_func_t fire;
etimer_t *next; };
static etimer_t *free_timers = 0;
static etimer_t *active_timers = 0;
static memnode_t *etimer_nodes = 0;
Message Queue is an essential part of message-passing reactive systems. As soon as Erlang virual machine is built upon message-passing framework, the concept of a message and a queue are central for the system. All messages are being added to the head of the queue atomically. The node is driven by single processor.
term_t msg_queue_current(msg_queue_t *mq);
int msg_queue_len(msg_queue_t *mq);
void msg_queue_next(msg_queue_t *mq);
void msg_queue_reset(msg_queue_t *mq);
void msg_queue_drop(msg_queue_t *mq);
void msg_queue_done(msg_queue_t *mq);
void msg_queue_mark(msg_queue_t *mq,uint32_t *mark);
void msg_queue_restore(msg_queue_t *mq,uint32_t *mark);
int msg_queue_push_N(msg_queue_t *mq,term_t t);
void msg_queue_fill_root_regs(msg_queue_t *mq, region_t *r);
void msg_queue_init(msg_queue_t *mq,
uint32_t *buf_starts,
uint32_t *buf_ends);
typedef struct message_t message_t;
struct message_t {
term_t body;
message_t *next; };
typedef struct msg_queue_t msg_queue_t;
struct msg_queue_t {
message_t *head;
message_t **tail;
int count;
message_t **current;
message_t **saved_last;
uint32_t *mark;
message_t *free;
memnode_t *nodes; };
Monitors — are the unidirectional way of tracking processes. It can be stacked. The other primitives of the Erlang VM are links which are unstackable bidirectional form built on top of two monitors.
int monitor(uint64_t ref_id,term_t pid1,term_t pid2);
int demonitor(uint64_t ref_id,term_t pid1);
int notify_monitors_N(term_t late,term_t reason);
term_t list_monitored_by(term_t pid2,heap_t *hp);
term_t list_monitors(term_t pid1, heap_t *hp);
typedef struct monitor_t monitor_t;
struct monitor_t {
uint64_t ref_id;
term_t pid1; // whatcher
term_t pid2; // whatchee
term_t what; // pid
monitor_t *next;
};
static memnode_t *monitor_nodes = 0;
static monitor_t *active_monitors = 0;
static monitor_t *free_monitors = 0;
Console, Disk, UDP, TCP, DNS and other drivers are implement Outlet async interface. Outlet gives you an extensibility power to LING. It is more like BEAM ports interface from the client point of view, but from the other hand it is more like NIF due to C linkage.
typedef outlet_t *(*outlet_factory_func_t)
(proc_t *cont_proc,
uint32_t bit_opts);
outlet_factory_func_t outlet_resolve_driver(term_t name);
outlet_t *outlet_make_N(outlet_vtab_t *vtab,
proc_t *cont_proc,
int32_t bit_opts,
uint32_t extra);
void outlet_new_data(outlet_t *ol,
uint8_t *data,
int dlen);
term_t outlet_control(outlet_t *ol, uint32_t op,
uint8_t *data, int dlen,
term_t reply_to, heap_t *hp);
int outlet_attach(outlet_t *ol);
void outlet_detach(outlet_t *ol);
void outlet_destroy_private(outlet_t *ol);
outlet_t *outlet_lookup(term_t oid);
outlet_t *outlet_lookup_by_name(term_t name);
uint8_t *outlet_get_send_buffer(outlet_t *ol,int len);
int outlet_send(outlet_t *ol,int len,term_t reply_to);
term_t outlet_all(heap_t *hp);
void outlet_register(outlet_t *ol, term_t name);
void outlet_unregister(outlet_t *ol);
void outlet_pass_new_data(outlet_t *ol,
uint8_t *data,
int dlen);
int outlet_signal_exit_N(outlet_t *ol,
term_t src,
term_t reason);
void outlet_close(outlet_t *ol, term_t reason);
void outlet_destroy(outlet_t *ol);
int outlet_notify_owner(outlet_t *ol, term_t what);
struct outlet_t {
term_t oid;
term_t name;
outlet_vtab_t *vtab;
term_t owner;
inter_links_t links;
term_t data;
memnode_t *home_node;
outlet_t **ref;
outlet_t *next;
uint32_t inout;
uint32_t binary;
uint32_t packet;
uint32_t line;
uint32_t eof;
uint32_t notify_on_close;
union { struct ip_pcb *ip;
struct udp_pcb *udp;
struct tcp_pcb *tcp; };
int active;
int deliver;
int buffer;
int header;
int packet_size;
int exit_on_close;
int max_mq_len;
int cr_in_progress; // TCP - connection mode
term_t cr_reply_to;
int cr_timeout_set;
int send_in_progress;
uint32_t max_send_bufsize;
uint8_t *send_buffer;
memnode_t *send_buf_node;
int send_buf_ack;
int send_buf_off;
int send_buf_left;
term_t send_reply_to;
int send_timeout_set;
uint32_t send_timeout;
uint32_t recv_expected_size;
uint32_t recv_bufsize;
uint32_t max_recv_bufsize;
uint8_t *recv_buffer;
memnode_t *recv_buf_node;
int recv_buf_off;
term_t empty_queue_in_progress;
term_t empty_queue_reply_to;
int peer_close_detected;
int backlog; // TCP - listening mode
acc_pend_t *free_pends;
memnode_t *pend_nodes;
acc_pend_t *accepting;
acc_pend_t *accepted;
disk_req_t *out_reqs; // Disk
disk_req_t *free_reqs;
int unicode_state; // Unicode
netfe_t *front_end; };