Root/net/tipc/port.c

1/*
2 * net/tipc/port.c: TIPC port code
3 *
4 * Copyright (c) 1992-2007, Ericsson AB
5 * Copyright (c) 2004-2008, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "core.h"
38#include "config.h"
39#include "dbg.h"
40#include "port.h"
41#include "addr.h"
42#include "link.h"
43#include "node.h"
44#include "name_table.h"
45#include "user_reg.h"
46#include "msg.h"
47#include "bcast.h"
48
49/* Connection management: */
50#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */
51#define CONFIRMED 0
52#define PROBING 1
53
54#define MAX_REJECT_SIZE 1024
55
56static struct sk_buff *msg_queue_head = NULL;
57static struct sk_buff *msg_queue_tail = NULL;
58
59DEFINE_SPINLOCK(tipc_port_list_lock);
60static DEFINE_SPINLOCK(queue_lock);
61
62static LIST_HEAD(ports);
63static void port_handle_node_down(unsigned long ref);
64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
66static void port_timeout(unsigned long ref);
67
68
69static u32 port_peernode(struct port *p_ptr)
70{
71    return msg_destnode(&p_ptr->publ.phdr);
72}
73
74static u32 port_peerport(struct port *p_ptr)
75{
76    return msg_destport(&p_ptr->publ.phdr);
77}
78
79static u32 port_out_seqno(struct port *p_ptr)
80{
81    return msg_transp_seqno(&p_ptr->publ.phdr);
82}
83
84static void port_incr_out_seqno(struct port *p_ptr)
85{
86    struct tipc_msg *m = &p_ptr->publ.phdr;
87
88    if (likely(!msg_routed(m)))
89        return;
90    msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
91}
92
93/**
94 * tipc_multicast - send a multicast message to local and remote destinations
95 */
96
97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
98           u32 num_sect, struct iovec const *msg_sect)
99{
100    struct tipc_msg *hdr;
101    struct sk_buff *buf;
102    struct sk_buff *ibuf = NULL;
103    struct port_list dports = {0, NULL, };
104    struct port *oport = tipc_port_deref(ref);
105    int ext_targets;
106    int res;
107
108    if (unlikely(!oport))
109        return -EINVAL;
110
111    /* Create multicast message */
112
113    hdr = &oport->publ.phdr;
114    msg_set_type(hdr, TIPC_MCAST_MSG);
115    msg_set_nametype(hdr, seq->type);
116    msg_set_namelower(hdr, seq->lower);
117    msg_set_nameupper(hdr, seq->upper);
118    msg_set_hdr_sz(hdr, MCAST_H_SIZE);
119    res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
120            !oport->user_port, &buf);
121    if (unlikely(!buf))
122        return res;
123
124    /* Figure out where to send multicast message */
125
126    ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
127                        TIPC_NODE_SCOPE, &dports);
128
129    /* Send message to destinations (duplicate it only if necessary) */
130
131    if (ext_targets) {
132        if (dports.count != 0) {
133            ibuf = skb_copy(buf, GFP_ATOMIC);
134            if (ibuf == NULL) {
135                tipc_port_list_free(&dports);
136                buf_discard(buf);
137                return -ENOMEM;
138            }
139        }
140        res = tipc_bclink_send_msg(buf);
141        if ((res < 0) && (dports.count != 0)) {
142            buf_discard(ibuf);
143        }
144    } else {
145        ibuf = buf;
146    }
147
148    if (res >= 0) {
149        if (ibuf)
150            tipc_port_recv_mcast(ibuf, &dports);
151    } else {
152        tipc_port_list_free(&dports);
153    }
154    return res;
155}
156
157/**
158 * tipc_port_recv_mcast - deliver multicast message to all destination ports
159 *
160 * If there is no port list, perform a lookup to create one
161 */
162
163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
164{
165    struct tipc_msg* msg;
166    struct port_list dports = {0, NULL, };
167    struct port_list *item = dp;
168    int cnt = 0;
169
170    msg = buf_msg(buf);
171
172    /* Create destination port list, if one wasn't supplied */
173
174    if (dp == NULL) {
175        tipc_nametbl_mc_translate(msg_nametype(msg),
176                     msg_namelower(msg),
177                     msg_nameupper(msg),
178                     TIPC_CLUSTER_SCOPE,
179                     &dports);
180        item = dp = &dports;
181    }
182
183    /* Deliver a copy of message to each destination port */
184
185    if (dp->count != 0) {
186        if (dp->count == 1) {
187            msg_set_destport(msg, dp->ports[0]);
188            tipc_port_recv_msg(buf);
189            tipc_port_list_free(dp);
190            return;
191        }
192        for (; cnt < dp->count; cnt++) {
193            int index = cnt % PLSIZE;
194            struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
195
196            if (b == NULL) {
197                warn("Unable to deliver multicast message(s)\n");
198                msg_dbg(msg, "LOST:");
199                goto exit;
200            }
201            if ((index == 0) && (cnt != 0)) {
202                item = item->next;
203            }
204            msg_set_destport(buf_msg(b),item->ports[index]);
205            tipc_port_recv_msg(b);
206        }
207    }
208exit:
209    buf_discard(buf);
210    tipc_port_list_free(dp);
211}
212
213/**
214 * tipc_createport_raw - create a generic TIPC port
215 *
216 * Returns pointer to (locked) TIPC port, or NULL if unable to create it
217 */
218
219struct tipc_port *tipc_createport_raw(void *usr_handle,
220            u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
221            void (*wakeup)(struct tipc_port *),
222            const u32 importance)
223{
224    struct port *p_ptr;
225    struct tipc_msg *msg;
226    u32 ref;
227
228    p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
229    if (!p_ptr) {
230        warn("Port creation failed, no memory\n");
231        return NULL;
232    }
233    ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
234    if (!ref) {
235        warn("Port creation failed, reference table exhausted\n");
236        kfree(p_ptr);
237        return NULL;
238    }
239
240    p_ptr->publ.usr_handle = usr_handle;
241    p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
242    p_ptr->publ.ref = ref;
243    msg = &p_ptr->publ.phdr;
244    msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
245    msg_set_origport(msg, ref);
246    p_ptr->last_in_seqno = 41;
247    p_ptr->sent = 1;
248    INIT_LIST_HEAD(&p_ptr->wait_list);
249    INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
250    p_ptr->congested_link = NULL;
251    p_ptr->dispatcher = dispatcher;
252    p_ptr->wakeup = wakeup;
253    p_ptr->user_port = NULL;
254    k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
255    spin_lock_bh(&tipc_port_list_lock);
256    INIT_LIST_HEAD(&p_ptr->publications);
257    INIT_LIST_HEAD(&p_ptr->port_list);
258    list_add_tail(&p_ptr->port_list, &ports);
259    spin_unlock_bh(&tipc_port_list_lock);
260    return &(p_ptr->publ);
261}
262
263int tipc_deleteport(u32 ref)
264{
265    struct port *p_ptr;
266    struct sk_buff *buf = NULL;
267
268    tipc_withdraw(ref, 0, NULL);
269    p_ptr = tipc_port_lock(ref);
270    if (!p_ptr)
271        return -EINVAL;
272
273    tipc_ref_discard(ref);
274    tipc_port_unlock(p_ptr);
275
276    k_cancel_timer(&p_ptr->timer);
277    if (p_ptr->publ.connected) {
278        buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
279        tipc_nodesub_unsubscribe(&p_ptr->subscription);
280    }
281    if (p_ptr->user_port) {
282        tipc_reg_remove_port(p_ptr->user_port);
283        kfree(p_ptr->user_port);
284    }
285
286    spin_lock_bh(&tipc_port_list_lock);
287    list_del(&p_ptr->port_list);
288    list_del(&p_ptr->wait_list);
289    spin_unlock_bh(&tipc_port_list_lock);
290    k_term_timer(&p_ptr->timer);
291    kfree(p_ptr);
292    dbg("Deleted port %u\n", ref);
293    tipc_net_route_msg(buf);
294    return 0;
295}
296
297/**
298 * tipc_get_port() - return port associated with 'ref'
299 *
300 * Note: Port is not locked.
301 */
302
303struct tipc_port *tipc_get_port(const u32 ref)
304{
305    return (struct tipc_port *)tipc_ref_deref(ref);
306}
307
308/**
309 * tipc_get_handle - return user handle associated to port 'ref'
310 */
311
312void *tipc_get_handle(const u32 ref)
313{
314    struct port *p_ptr;
315    void * handle;
316
317    p_ptr = tipc_port_lock(ref);
318    if (!p_ptr)
319        return NULL;
320    handle = p_ptr->publ.usr_handle;
321    tipc_port_unlock(p_ptr);
322    return handle;
323}
324
325static int port_unreliable(struct port *p_ptr)
326{
327    return msg_src_droppable(&p_ptr->publ.phdr);
328}
329
330int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
331{
332    struct port *p_ptr;
333
334    p_ptr = tipc_port_lock(ref);
335    if (!p_ptr)
336        return -EINVAL;
337    *isunreliable = port_unreliable(p_ptr);
338    tipc_port_unlock(p_ptr);
339    return 0;
340}
341
342int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
343{
344    struct port *p_ptr;
345
346    p_ptr = tipc_port_lock(ref);
347    if (!p_ptr)
348        return -EINVAL;
349    msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
350    tipc_port_unlock(p_ptr);
351    return 0;
352}
353
354static int port_unreturnable(struct port *p_ptr)
355{
356    return msg_dest_droppable(&p_ptr->publ.phdr);
357}
358
359int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
360{
361    struct port *p_ptr;
362
363    p_ptr = tipc_port_lock(ref);
364    if (!p_ptr)
365        return -EINVAL;
366    *isunrejectable = port_unreturnable(p_ptr);
367    tipc_port_unlock(p_ptr);
368    return 0;
369}
370
371int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
372{
373    struct port *p_ptr;
374
375    p_ptr = tipc_port_lock(ref);
376    if (!p_ptr)
377        return -EINVAL;
378    msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
379    tipc_port_unlock(p_ptr);
380    return 0;
381}
382
383/*
384 * port_build_proto_msg(): build a port level protocol
385 * or a connection abortion message. Called with
386 * tipc_port lock on.
387 */
388static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
389                        u32 origport, u32 orignode,
390                        u32 usr, u32 type, u32 err,
391                        u32 seqno, u32 ack)
392{
393    struct sk_buff *buf;
394    struct tipc_msg *msg;
395
396    buf = buf_acquire(LONG_H_SIZE);
397    if (buf) {
398        msg = buf_msg(buf);
399        msg_init(msg, usr, type, LONG_H_SIZE, destnode);
400        msg_set_errcode(msg, err);
401        msg_set_destport(msg, destport);
402        msg_set_origport(msg, origport);
403        msg_set_orignode(msg, orignode);
404        msg_set_transp_seqno(msg, seqno);
405        msg_set_msgcnt(msg, ack);
406        msg_dbg(msg, "PORT>SEND>:");
407    }
408    return buf;
409}
410
411int tipc_reject_msg(struct sk_buff *buf, u32 err)
412{
413    struct tipc_msg *msg = buf_msg(buf);
414    struct sk_buff *rbuf;
415    struct tipc_msg *rmsg;
416    int hdr_sz;
417    u32 imp = msg_importance(msg);
418    u32 data_sz = msg_data_sz(msg);
419
420    if (data_sz > MAX_REJECT_SIZE)
421        data_sz = MAX_REJECT_SIZE;
422    if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
423        imp++;
424    msg_dbg(msg, "port->rej: ");
425
426    /* discard rejected message if it shouldn't be returned to sender */
427    if (msg_errcode(msg) || msg_dest_droppable(msg)) {
428        buf_discard(buf);
429        return data_sz;
430    }
431
432    /* construct rejected message */
433    if (msg_mcast(msg))
434        hdr_sz = MCAST_H_SIZE;
435    else
436        hdr_sz = LONG_H_SIZE;
437    rbuf = buf_acquire(data_sz + hdr_sz);
438    if (rbuf == NULL) {
439        buf_discard(buf);
440        return data_sz;
441    }
442    rmsg = buf_msg(rbuf);
443    msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg));
444    msg_set_errcode(rmsg, err);
445    msg_set_destport(rmsg, msg_origport(msg));
446    msg_set_origport(rmsg, msg_destport(msg));
447    if (msg_short(msg)) {
448        msg_set_orignode(rmsg, tipc_own_addr);
449        /* leave name type & instance as zeroes */
450    } else {
451        msg_set_orignode(rmsg, msg_destnode(msg));
452        msg_set_nametype(rmsg, msg_nametype(msg));
453        msg_set_nameinst(rmsg, msg_nameinst(msg));
454    }
455    msg_set_size(rmsg, data_sz + hdr_sz);
456    skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz);
457
458    /* send self-abort message when rejecting on a connected port */
459    if (msg_connected(msg)) {
460        struct sk_buff *abuf = NULL;
461        struct port *p_ptr = tipc_port_lock(msg_destport(msg));
462
463        if (p_ptr) {
464            if (p_ptr->publ.connected)
465                abuf = port_build_self_abort_msg(p_ptr, err);
466            tipc_port_unlock(p_ptr);
467        }
468        tipc_net_route_msg(abuf);
469    }
470
471    /* send rejected message */
472    buf_discard(buf);
473    tipc_net_route_msg(rbuf);
474    return data_sz;
475}
476
477int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
478                  struct iovec const *msg_sect, u32 num_sect,
479                  int err)
480{
481    struct sk_buff *buf;
482    int res;
483
484    res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
485            !p_ptr->user_port, &buf);
486    if (!buf)
487        return res;
488
489    return tipc_reject_msg(buf, err);
490}
491
492static void port_timeout(unsigned long ref)
493{
494    struct port *p_ptr = tipc_port_lock(ref);
495    struct sk_buff *buf = NULL;
496
497    if (!p_ptr)
498        return;
499
500    if (!p_ptr->publ.connected) {
501        tipc_port_unlock(p_ptr);
502        return;
503    }
504
505    /* Last probe answered ? */
506    if (p_ptr->probing_state == PROBING) {
507        buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
508    } else {
509        buf = port_build_proto_msg(port_peerport(p_ptr),
510                       port_peernode(p_ptr),
511                       p_ptr->publ.ref,
512                       tipc_own_addr,
513                       CONN_MANAGER,
514                       CONN_PROBE,
515                       TIPC_OK,
516                       port_out_seqno(p_ptr),
517                       0);
518        port_incr_out_seqno(p_ptr);
519        p_ptr->probing_state = PROBING;
520        k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
521    }
522    tipc_port_unlock(p_ptr);
523    tipc_net_route_msg(buf);
524}
525
526
527static void port_handle_node_down(unsigned long ref)
528{
529    struct port *p_ptr = tipc_port_lock(ref);
530    struct sk_buff* buf = NULL;
531
532    if (!p_ptr)
533        return;
534    buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
535    tipc_port_unlock(p_ptr);
536    tipc_net_route_msg(buf);
537}
538
539
540static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
541{
542    u32 imp = msg_importance(&p_ptr->publ.phdr);
543
544    if (!p_ptr->publ.connected)
545        return NULL;
546    if (imp < TIPC_CRITICAL_IMPORTANCE)
547        imp++;
548    return port_build_proto_msg(p_ptr->publ.ref,
549                    tipc_own_addr,
550                    port_peerport(p_ptr),
551                    port_peernode(p_ptr),
552                    imp,
553                    TIPC_CONN_MSG,
554                    err,
555                    p_ptr->last_in_seqno + 1,
556                    0);
557}
558
559
560static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
561{
562    u32 imp = msg_importance(&p_ptr->publ.phdr);
563
564    if (!p_ptr->publ.connected)
565        return NULL;
566    if (imp < TIPC_CRITICAL_IMPORTANCE)
567        imp++;
568    return port_build_proto_msg(port_peerport(p_ptr),
569                    port_peernode(p_ptr),
570                    p_ptr->publ.ref,
571                    tipc_own_addr,
572                    imp,
573                    TIPC_CONN_MSG,
574                    err,
575                    port_out_seqno(p_ptr),
576                    0);
577}
578
579void tipc_port_recv_proto_msg(struct sk_buff *buf)
580{
581    struct tipc_msg *msg = buf_msg(buf);
582    struct port *p_ptr = tipc_port_lock(msg_destport(msg));
583    u32 err = TIPC_OK;
584    struct sk_buff *r_buf = NULL;
585    struct sk_buff *abort_buf = NULL;
586
587    msg_dbg(msg, "PORT<RECV<:");
588
589    if (!p_ptr) {
590        err = TIPC_ERR_NO_PORT;
591    } else if (p_ptr->publ.connected) {
592        if (port_peernode(p_ptr) != msg_orignode(msg))
593            err = TIPC_ERR_NO_PORT;
594        if (port_peerport(p_ptr) != msg_origport(msg))
595            err = TIPC_ERR_NO_PORT;
596        if (!err && msg_routed(msg)) {
597            u32 seqno = msg_transp_seqno(msg);
598            u32 myno = ++p_ptr->last_in_seqno;
599            if (seqno != myno) {
600                err = TIPC_ERR_NO_PORT;
601                abort_buf = port_build_self_abort_msg(p_ptr, err);
602            }
603        }
604        if (msg_type(msg) == CONN_ACK) {
605            int wakeup = tipc_port_congested(p_ptr) &&
606                     p_ptr->publ.congested &&
607                     p_ptr->wakeup;
608            p_ptr->acked += msg_msgcnt(msg);
609            if (tipc_port_congested(p_ptr))
610                goto exit;
611            p_ptr->publ.congested = 0;
612            if (!wakeup)
613                goto exit;
614            p_ptr->wakeup(&p_ptr->publ);
615            goto exit;
616        }
617    } else if (p_ptr->publ.published) {
618        err = TIPC_ERR_NO_PORT;
619    }
620    if (err) {
621        r_buf = port_build_proto_msg(msg_origport(msg),
622                         msg_orignode(msg),
623                         msg_destport(msg),
624                         tipc_own_addr,
625                         TIPC_HIGH_IMPORTANCE,
626                         TIPC_CONN_MSG,
627                         err,
628                         0,
629                         0);
630        goto exit;
631    }
632
633    /* All is fine */
634    if (msg_type(msg) == CONN_PROBE) {
635        r_buf = port_build_proto_msg(msg_origport(msg),
636                         msg_orignode(msg),
637                         msg_destport(msg),
638                         tipc_own_addr,
639                         CONN_MANAGER,
640                         CONN_PROBE_REPLY,
641                         TIPC_OK,
642                         port_out_seqno(p_ptr),
643                         0);
644    }
645    p_ptr->probing_state = CONFIRMED;
646    port_incr_out_seqno(p_ptr);
647exit:
648    if (p_ptr)
649        tipc_port_unlock(p_ptr);
650    tipc_net_route_msg(r_buf);
651    tipc_net_route_msg(abort_buf);
652    buf_discard(buf);
653}
654
655static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
656{
657    struct publication *publ;
658
659    if (full_id)
660        tipc_printf(buf, "<%u.%u.%u:%u>:",
661                tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
662                tipc_node(tipc_own_addr), p_ptr->publ.ref);
663    else
664        tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
665
666    if (p_ptr->publ.connected) {
667        u32 dport = port_peerport(p_ptr);
668        u32 destnode = port_peernode(p_ptr);
669
670        tipc_printf(buf, " connected to <%u.%u.%u:%u>",
671                tipc_zone(destnode), tipc_cluster(destnode),
672                tipc_node(destnode), dport);
673        if (p_ptr->publ.conn_type != 0)
674            tipc_printf(buf, " via {%u,%u}",
675                    p_ptr->publ.conn_type,
676                    p_ptr->publ.conn_instance);
677    }
678    else if (p_ptr->publ.published) {
679        tipc_printf(buf, " bound to");
680        list_for_each_entry(publ, &p_ptr->publications, pport_list) {
681            if (publ->lower == publ->upper)
682                tipc_printf(buf, " {%u,%u}", publ->type,
683                        publ->lower);
684            else
685                tipc_printf(buf, " {%u,%u,%u}", publ->type,
686                        publ->lower, publ->upper);
687        }
688    }
689    tipc_printf(buf, "\n");
690}
691
692#define MAX_PORT_QUERY 32768
693
694struct sk_buff *tipc_port_get_ports(void)
695{
696    struct sk_buff *buf;
697    struct tlv_desc *rep_tlv;
698    struct print_buf pb;
699    struct port *p_ptr;
700    int str_len;
701
702    buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
703    if (!buf)
704        return NULL;
705    rep_tlv = (struct tlv_desc *)buf->data;
706
707    tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
708    spin_lock_bh(&tipc_port_list_lock);
709    list_for_each_entry(p_ptr, &ports, port_list) {
710        spin_lock_bh(p_ptr->publ.lock);
711        port_print(p_ptr, &pb, 0);
712        spin_unlock_bh(p_ptr->publ.lock);
713    }
714    spin_unlock_bh(&tipc_port_list_lock);
715    str_len = tipc_printbuf_validate(&pb);
716
717    skb_put(buf, TLV_SPACE(str_len));
718    TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
719
720    return buf;
721}
722
723#if 0
724
725#define MAX_PORT_STATS 2000
726
727struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
728{
729    u32 ref;
730    struct port *p_ptr;
731    struct sk_buff *buf;
732    struct tlv_desc *rep_tlv;
733    struct print_buf pb;
734    int str_len;
735
736    if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
737        return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
738
739    ref = *(u32 *)TLV_DATA(req_tlv_area);
740    ref = ntohl(ref);
741
742    p_ptr = tipc_port_lock(ref);
743    if (!p_ptr)
744        return cfg_reply_error_string("port not found");
745
746    buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
747    if (!buf) {
748        tipc_port_unlock(p_ptr);
749        return NULL;
750    }
751    rep_tlv = (struct tlv_desc *)buf->data;
752
753    tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
754    port_print(p_ptr, &pb, 1);
755    /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
756    tipc_port_unlock(p_ptr);
757    str_len = tipc_printbuf_validate(&pb);
758
759    skb_put(buf, TLV_SPACE(str_len));
760    TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
761
762    return buf;
763}
764
765#endif
766
767void tipc_port_reinit(void)
768{
769    struct port *p_ptr;
770    struct tipc_msg *msg;
771
772    spin_lock_bh(&tipc_port_list_lock);
773    list_for_each_entry(p_ptr, &ports, port_list) {
774        msg = &p_ptr->publ.phdr;
775        if (msg_orignode(msg) == tipc_own_addr)
776            break;
777        msg_set_prevnode(msg, tipc_own_addr);
778        msg_set_orignode(msg, tipc_own_addr);
779    }
780    spin_unlock_bh(&tipc_port_list_lock);
781}
782
783
784/*
785 * port_dispatcher_sigh(): Signal handler for messages destinated
786 * to the tipc_port interface.
787 */
788
789static void port_dispatcher_sigh(void *dummy)
790{
791    struct sk_buff *buf;
792
793    spin_lock_bh(&queue_lock);
794    buf = msg_queue_head;
795    msg_queue_head = NULL;
796    spin_unlock_bh(&queue_lock);
797
798    while (buf) {
799        struct port *p_ptr;
800        struct user_port *up_ptr;
801        struct tipc_portid orig;
802        struct tipc_name_seq dseq;
803        void *usr_handle;
804        int connected;
805        int published;
806        u32 message_type;
807
808        struct sk_buff *next = buf->next;
809        struct tipc_msg *msg = buf_msg(buf);
810        u32 dref = msg_destport(msg);
811
812        message_type = msg_type(msg);
813        if (message_type > TIPC_DIRECT_MSG)
814            goto reject; /* Unsupported message type */
815
816        p_ptr = tipc_port_lock(dref);
817        if (!p_ptr)
818            goto reject; /* Port deleted while msg in queue */
819
820        orig.ref = msg_origport(msg);
821        orig.node = msg_orignode(msg);
822        up_ptr = p_ptr->user_port;
823        usr_handle = up_ptr->usr_handle;
824        connected = p_ptr->publ.connected;
825        published = p_ptr->publ.published;
826
827        if (unlikely(msg_errcode(msg)))
828            goto err;
829
830        switch (message_type) {
831
832        case TIPC_CONN_MSG:{
833                tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
834                u32 peer_port = port_peerport(p_ptr);
835                u32 peer_node = port_peernode(p_ptr);
836
837                tipc_port_unlock(p_ptr);
838                if (unlikely(!cb))
839                    goto reject;
840                if (unlikely(!connected)) {
841                    if (tipc_connect2port(dref, &orig))
842                        goto reject;
843                } else if ((msg_origport(msg) != peer_port) ||
844                       (msg_orignode(msg) != peer_node))
845                    goto reject;
846                if (unlikely(++p_ptr->publ.conn_unacked >=
847                         TIPC_FLOW_CONTROL_WIN))
848                    tipc_acknowledge(dref,
849                             p_ptr->publ.conn_unacked);
850                skb_pull(buf, msg_hdr_sz(msg));
851                cb(usr_handle, dref, &buf, msg_data(msg),
852                   msg_data_sz(msg));
853                break;
854            }
855        case TIPC_DIRECT_MSG:{
856                tipc_msg_event cb = up_ptr->msg_cb;
857
858                tipc_port_unlock(p_ptr);
859                if (unlikely(!cb || connected))
860                    goto reject;
861                skb_pull(buf, msg_hdr_sz(msg));
862                cb(usr_handle, dref, &buf, msg_data(msg),
863                   msg_data_sz(msg), msg_importance(msg),
864                   &orig);
865                break;
866            }
867        case TIPC_MCAST_MSG:
868        case TIPC_NAMED_MSG:{
869                tipc_named_msg_event cb = up_ptr->named_msg_cb;
870
871                tipc_port_unlock(p_ptr);
872                if (unlikely(!cb || connected || !published))
873                    goto reject;
874                dseq.type = msg_nametype(msg);
875                dseq.lower = msg_nameinst(msg);
876                dseq.upper = (message_type == TIPC_NAMED_MSG)
877                    ? dseq.lower : msg_nameupper(msg);
878                skb_pull(buf, msg_hdr_sz(msg));
879                cb(usr_handle, dref, &buf, msg_data(msg),
880                   msg_data_sz(msg), msg_importance(msg),
881                   &orig, &dseq);
882                break;
883            }
884        }
885        if (buf)
886            buf_discard(buf);
887        buf = next;
888        continue;
889err:
890        switch (message_type) {
891
892        case TIPC_CONN_MSG:{
893                tipc_conn_shutdown_event cb =
894                    up_ptr->conn_err_cb;
895                u32 peer_port = port_peerport(p_ptr);
896                u32 peer_node = port_peernode(p_ptr);
897
898                tipc_port_unlock(p_ptr);
899                if (!cb || !connected)
900                    break;
901                if ((msg_origport(msg) != peer_port) ||
902                    (msg_orignode(msg) != peer_node))
903                    break;
904                tipc_disconnect(dref);
905                skb_pull(buf, msg_hdr_sz(msg));
906                cb(usr_handle, dref, &buf, msg_data(msg),
907                   msg_data_sz(msg), msg_errcode(msg));
908                break;
909            }
910        case TIPC_DIRECT_MSG:{
911                tipc_msg_err_event cb = up_ptr->err_cb;
912
913                tipc_port_unlock(p_ptr);
914                if (!cb || connected)
915                    break;
916                skb_pull(buf, msg_hdr_sz(msg));
917                cb(usr_handle, dref, &buf, msg_data(msg),
918                   msg_data_sz(msg), msg_errcode(msg), &orig);
919                break;
920            }
921        case TIPC_MCAST_MSG:
922        case TIPC_NAMED_MSG:{
923                tipc_named_msg_err_event cb =
924                    up_ptr->named_err_cb;
925
926                tipc_port_unlock(p_ptr);
927                if (!cb || connected)
928                    break;
929                dseq.type = msg_nametype(msg);
930                dseq.lower = msg_nameinst(msg);
931                dseq.upper = (message_type == TIPC_NAMED_MSG)
932                    ? dseq.lower : msg_nameupper(msg);
933                skb_pull(buf, msg_hdr_sz(msg));
934                cb(usr_handle, dref, &buf, msg_data(msg),
935                   msg_data_sz(msg), msg_errcode(msg), &dseq);
936                break;
937            }
938        }
939        if (buf)
940            buf_discard(buf);
941        buf = next;
942        continue;
943reject:
944        tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
945        buf = next;
946    }
947}
948
949/*
950 * port_dispatcher(): Dispatcher for messages destinated
951 * to the tipc_port interface. Called with port locked.
952 */
953
954static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
955{
956    buf->next = NULL;
957    spin_lock_bh(&queue_lock);
958    if (msg_queue_head) {
959        msg_queue_tail->next = buf;
960        msg_queue_tail = buf;
961    } else {
962        msg_queue_tail = msg_queue_head = buf;
963        tipc_k_signal((Handler)port_dispatcher_sigh, 0);
964    }
965    spin_unlock_bh(&queue_lock);
966    return 0;
967}
968
969/*
970 * Wake up port after congestion: Called with port locked,
971 *
972 */
973
974static void port_wakeup_sh(unsigned long ref)
975{
976    struct port *p_ptr;
977    struct user_port *up_ptr;
978    tipc_continue_event cb = NULL;
979    void *uh = NULL;
980
981    p_ptr = tipc_port_lock(ref);
982    if (p_ptr) {
983        up_ptr = p_ptr->user_port;
984        if (up_ptr) {
985            cb = up_ptr->continue_event_cb;
986            uh = up_ptr->usr_handle;
987        }
988        tipc_port_unlock(p_ptr);
989    }
990    if (cb)
991        cb(uh, ref);
992}
993
994
995static void port_wakeup(struct tipc_port *p_ptr)
996{
997    tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
998}
999
1000void tipc_acknowledge(u32 ref, u32 ack)
1001{
1002    struct port *p_ptr;
1003    struct sk_buff *buf = NULL;
1004
1005    p_ptr = tipc_port_lock(ref);
1006    if (!p_ptr)
1007        return;
1008    if (p_ptr->publ.connected) {
1009        p_ptr->publ.conn_unacked -= ack;
1010        buf = port_build_proto_msg(port_peerport(p_ptr),
1011                       port_peernode(p_ptr),
1012                       ref,
1013                       tipc_own_addr,
1014                       CONN_MANAGER,
1015                       CONN_ACK,
1016                       TIPC_OK,
1017                       port_out_seqno(p_ptr),
1018                       ack);
1019    }
1020    tipc_port_unlock(p_ptr);
1021    tipc_net_route_msg(buf);
1022}
1023
1024/*
1025 * tipc_createport(): user level call. Will add port to
1026 * registry if non-zero user_ref.
1027 */
1028
1029int tipc_createport(u32 user_ref,
1030            void *usr_handle,
1031            unsigned int importance,
1032            tipc_msg_err_event error_cb,
1033            tipc_named_msg_err_event named_error_cb,
1034            tipc_conn_shutdown_event conn_error_cb,
1035            tipc_msg_event msg_cb,
1036            tipc_named_msg_event named_msg_cb,
1037            tipc_conn_msg_event conn_msg_cb,
1038            tipc_continue_event continue_event_cb,/* May be zero */
1039            u32 *portref)
1040{
1041    struct user_port *up_ptr;
1042    struct port *p_ptr;
1043
1044    up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1045    if (!up_ptr) {
1046        warn("Port creation failed, no memory\n");
1047        return -ENOMEM;
1048    }
1049    p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher,
1050                           port_wakeup, importance);
1051    if (!p_ptr) {
1052        kfree(up_ptr);
1053        return -ENOMEM;
1054    }
1055
1056    p_ptr->user_port = up_ptr;
1057    up_ptr->user_ref = user_ref;
1058    up_ptr->usr_handle = usr_handle;
1059    up_ptr->ref = p_ptr->publ.ref;
1060    up_ptr->err_cb = error_cb;
1061    up_ptr->named_err_cb = named_error_cb;
1062    up_ptr->conn_err_cb = conn_error_cb;
1063    up_ptr->msg_cb = msg_cb;
1064    up_ptr->named_msg_cb = named_msg_cb;
1065    up_ptr->conn_msg_cb = conn_msg_cb;
1066    up_ptr->continue_event_cb = continue_event_cb;
1067    INIT_LIST_HEAD(&up_ptr->uport_list);
1068    tipc_reg_add_port(up_ptr);
1069    *portref = p_ptr->publ.ref;
1070    tipc_port_unlock(p_ptr);
1071    return 0;
1072}
1073
1074int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1075{
1076    id->ref = ref;
1077    id->node = tipc_own_addr;
1078    return 0;
1079}
1080
1081int tipc_portimportance(u32 ref, unsigned int *importance)
1082{
1083    struct port *p_ptr;
1084
1085    p_ptr = tipc_port_lock(ref);
1086    if (!p_ptr)
1087        return -EINVAL;
1088    *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1089    tipc_port_unlock(p_ptr);
1090    return 0;
1091}
1092
1093int tipc_set_portimportance(u32 ref, unsigned int imp)
1094{
1095    struct port *p_ptr;
1096
1097    if (imp > TIPC_CRITICAL_IMPORTANCE)
1098        return -EINVAL;
1099
1100    p_ptr = tipc_port_lock(ref);
1101    if (!p_ptr)
1102        return -EINVAL;
1103    msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1104    tipc_port_unlock(p_ptr);
1105    return 0;
1106}
1107
1108
1109int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1110{
1111    struct port *p_ptr;
1112    struct publication *publ;
1113    u32 key;
1114    int res = -EINVAL;
1115
1116    p_ptr = tipc_port_lock(ref);
1117    if (!p_ptr)
1118        return -EINVAL;
1119
1120    dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1121        "lower = %u, upper = %u\n",
1122        ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1123    if (p_ptr->publ.connected)
1124        goto exit;
1125    if (seq->lower > seq->upper)
1126        goto exit;
1127    if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1128        goto exit;
1129    key = ref + p_ptr->pub_count + 1;
1130    if (key == ref) {
1131        res = -EADDRINUSE;
1132        goto exit;
1133    }
1134    publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1135                    scope, p_ptr->publ.ref, key);
1136    if (publ) {
1137        list_add(&publ->pport_list, &p_ptr->publications);
1138        p_ptr->pub_count++;
1139        p_ptr->publ.published = 1;
1140        res = 0;
1141    }
1142exit:
1143    tipc_port_unlock(p_ptr);
1144    return res;
1145}
1146
1147int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1148{
1149    struct port *p_ptr;
1150    struct publication *publ;
1151    struct publication *tpubl;
1152    int res = -EINVAL;
1153
1154    p_ptr = tipc_port_lock(ref);
1155    if (!p_ptr)
1156        return -EINVAL;
1157    if (!seq) {
1158        list_for_each_entry_safe(publ, tpubl,
1159                     &p_ptr->publications, pport_list) {
1160            tipc_nametbl_withdraw(publ->type, publ->lower,
1161                          publ->ref, publ->key);
1162        }
1163        res = 0;
1164    } else {
1165        list_for_each_entry_safe(publ, tpubl,
1166                     &p_ptr->publications, pport_list) {
1167            if (publ->scope != scope)
1168                continue;
1169            if (publ->type != seq->type)
1170                continue;
1171            if (publ->lower != seq->lower)
1172                continue;
1173            if (publ->upper != seq->upper)
1174                break;
1175            tipc_nametbl_withdraw(publ->type, publ->lower,
1176                          publ->ref, publ->key);
1177            res = 0;
1178            break;
1179        }
1180    }
1181    if (list_empty(&p_ptr->publications))
1182        p_ptr->publ.published = 0;
1183    tipc_port_unlock(p_ptr);
1184    return res;
1185}
1186
1187int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1188{
1189    struct port *p_ptr;
1190    struct tipc_msg *msg;
1191    int res = -EINVAL;
1192
1193    p_ptr = tipc_port_lock(ref);
1194    if (!p_ptr)
1195        return -EINVAL;
1196    if (p_ptr->publ.published || p_ptr->publ.connected)
1197        goto exit;
1198    if (!peer->ref)
1199        goto exit;
1200
1201    msg = &p_ptr->publ.phdr;
1202    msg_set_destnode(msg, peer->node);
1203    msg_set_destport(msg, peer->ref);
1204    msg_set_orignode(msg, tipc_own_addr);
1205    msg_set_origport(msg, p_ptr->publ.ref);
1206    msg_set_transp_seqno(msg, 42);
1207    msg_set_type(msg, TIPC_CONN_MSG);
1208    if (!may_route(peer->node))
1209        msg_set_hdr_sz(msg, SHORT_H_SIZE);
1210    else
1211        msg_set_hdr_sz(msg, LONG_H_SIZE);
1212
1213    p_ptr->probing_interval = PROBING_INTERVAL;
1214    p_ptr->probing_state = CONFIRMED;
1215    p_ptr->publ.connected = 1;
1216    k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1217
1218    tipc_nodesub_subscribe(&p_ptr->subscription,peer->node,
1219              (void *)(unsigned long)ref,
1220              (net_ev_handler)port_handle_node_down);
1221    res = 0;
1222exit:
1223    tipc_port_unlock(p_ptr);
1224    p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1225    return res;
1226}
1227
1228/**
1229 * tipc_disconnect_port - disconnect port from peer
1230 *
1231 * Port must be locked.
1232 */
1233
1234int tipc_disconnect_port(struct tipc_port *tp_ptr)
1235{
1236    int res;
1237
1238    if (tp_ptr->connected) {
1239        tp_ptr->connected = 0;
1240        /* let timer expire on it's own to avoid deadlock! */
1241        tipc_nodesub_unsubscribe(
1242            &((struct port *)tp_ptr)->subscription);
1243        res = 0;
1244    } else {
1245        res = -ENOTCONN;
1246    }
1247    return res;
1248}
1249
1250/*
1251 * tipc_disconnect(): Disconnect port form peer.
1252 * This is a node local operation.
1253 */
1254
1255int tipc_disconnect(u32 ref)
1256{
1257    struct port *p_ptr;
1258    int res;
1259
1260    p_ptr = tipc_port_lock(ref);
1261    if (!p_ptr)
1262        return -EINVAL;
1263    res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1264    tipc_port_unlock(p_ptr);
1265    return res;
1266}
1267
1268/*
1269 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1270 */
1271int tipc_shutdown(u32 ref)
1272{
1273    struct port *p_ptr;
1274    struct sk_buff *buf = NULL;
1275
1276    p_ptr = tipc_port_lock(ref);
1277    if (!p_ptr)
1278        return -EINVAL;
1279
1280    if (p_ptr->publ.connected) {
1281        u32 imp = msg_importance(&p_ptr->publ.phdr);
1282        if (imp < TIPC_CRITICAL_IMPORTANCE)
1283            imp++;
1284        buf = port_build_proto_msg(port_peerport(p_ptr),
1285                       port_peernode(p_ptr),
1286                       ref,
1287                       tipc_own_addr,
1288                       imp,
1289                       TIPC_CONN_MSG,
1290                       TIPC_CONN_SHUTDOWN,
1291                       port_out_seqno(p_ptr),
1292                       0);
1293    }
1294    tipc_port_unlock(p_ptr);
1295    tipc_net_route_msg(buf);
1296    return tipc_disconnect(ref);
1297}
1298
1299int tipc_isconnected(u32 ref, int *isconnected)
1300{
1301    struct port *p_ptr;
1302
1303    p_ptr = tipc_port_lock(ref);
1304    if (!p_ptr)
1305        return -EINVAL;
1306    *isconnected = p_ptr->publ.connected;
1307    tipc_port_unlock(p_ptr);
1308    return 0;
1309}
1310
1311int tipc_peer(u32 ref, struct tipc_portid *peer)
1312{
1313    struct port *p_ptr;
1314    int res;
1315
1316    p_ptr = tipc_port_lock(ref);
1317    if (!p_ptr)
1318        return -EINVAL;
1319    if (p_ptr->publ.connected) {
1320        peer->ref = port_peerport(p_ptr);
1321        peer->node = port_peernode(p_ptr);
1322        res = 0;
1323    } else
1324        res = -ENOTCONN;
1325    tipc_port_unlock(p_ptr);
1326    return res;
1327}
1328
1329int tipc_ref_valid(u32 ref)
1330{
1331    /* Works irrespective of type */
1332    return !!tipc_ref_deref(ref);
1333}
1334
1335
1336/*
1337 * tipc_port_recv_sections(): Concatenate and deliver sectioned
1338 * message for this node.
1339 */
1340
1341int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
1342               struct iovec const *msg_sect)
1343{
1344    struct sk_buff *buf;
1345    int res;
1346
1347    res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1348            MAX_MSG_SIZE, !sender->user_port, &buf);
1349    if (likely(buf))
1350        tipc_port_recv_msg(buf);
1351    return res;
1352}
1353
1354/**
1355 * tipc_send - send message sections on connection
1356 */
1357
1358int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1359{
1360    struct port *p_ptr;
1361    u32 destnode;
1362    int res;
1363
1364    p_ptr = tipc_port_deref(ref);
1365    if (!p_ptr || !p_ptr->publ.connected)
1366        return -EINVAL;
1367
1368    p_ptr->publ.congested = 1;
1369    if (!tipc_port_congested(p_ptr)) {
1370        destnode = port_peernode(p_ptr);
1371        if (likely(destnode != tipc_own_addr))
1372            res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1373                               destnode);
1374        else
1375            res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1376
1377        if (likely(res != -ELINKCONG)) {
1378            port_incr_out_seqno(p_ptr);
1379            p_ptr->publ.congested = 0;
1380            p_ptr->sent++;
1381            return res;
1382        }
1383    }
1384    if (port_unreliable(p_ptr)) {
1385        p_ptr->publ.congested = 0;
1386        /* Just calculate msg length and return */
1387        return msg_calc_data_size(msg_sect, num_sect);
1388    }
1389    return -ELINKCONG;
1390}
1391
1392/**
1393 * tipc_send_buf - send message buffer on connection
1394 */
1395
1396int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1397{
1398    struct port *p_ptr;
1399    struct tipc_msg *msg;
1400    u32 destnode;
1401    u32 hsz;
1402    u32 sz;
1403    u32 res;
1404
1405    p_ptr = tipc_port_deref(ref);
1406    if (!p_ptr || !p_ptr->publ.connected)
1407        return -EINVAL;
1408
1409    msg = &p_ptr->publ.phdr;
1410    hsz = msg_hdr_sz(msg);
1411    sz = hsz + dsz;
1412    msg_set_size(msg, sz);
1413    if (skb_cow(buf, hsz))
1414        return -ENOMEM;
1415
1416    skb_push(buf, hsz);
1417    skb_copy_to_linear_data(buf, msg, hsz);
1418    destnode = msg_destnode(msg);
1419    p_ptr->publ.congested = 1;
1420    if (!tipc_port_congested(p_ptr)) {
1421        if (likely(destnode != tipc_own_addr))
1422            res = tipc_send_buf_fast(buf, destnode);
1423        else {
1424            tipc_port_recv_msg(buf);
1425            res = sz;
1426        }
1427        if (likely(res != -ELINKCONG)) {
1428            port_incr_out_seqno(p_ptr);
1429            p_ptr->sent++;
1430            p_ptr->publ.congested = 0;
1431            return res;
1432        }
1433    }
1434    if (port_unreliable(p_ptr)) {
1435        p_ptr->publ.congested = 0;
1436        return dsz;
1437    }
1438    return -ELINKCONG;
1439}
1440
1441/**
1442 * tipc_forward2name - forward message sections to port name
1443 */
1444
1445int tipc_forward2name(u32 ref,
1446              struct tipc_name const *name,
1447              u32 domain,
1448              u32 num_sect,
1449              struct iovec const *msg_sect,
1450              struct tipc_portid const *orig,
1451              unsigned int importance)
1452{
1453    struct port *p_ptr;
1454    struct tipc_msg *msg;
1455    u32 destnode = domain;
1456    u32 destport = 0;
1457    int res;
1458
1459    p_ptr = tipc_port_deref(ref);
1460    if (!p_ptr || p_ptr->publ.connected)
1461        return -EINVAL;
1462
1463    msg = &p_ptr->publ.phdr;
1464    msg_set_type(msg, TIPC_NAMED_MSG);
1465    msg_set_orignode(msg, orig->node);
1466    msg_set_origport(msg, orig->ref);
1467    msg_set_hdr_sz(msg, LONG_H_SIZE);
1468    msg_set_nametype(msg, name->type);
1469    msg_set_nameinst(msg, name->instance);
1470    msg_set_lookup_scope(msg, addr_scope(domain));
1471    if (importance <= TIPC_CRITICAL_IMPORTANCE)
1472        msg_set_importance(msg,importance);
1473    destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1474    msg_set_destnode(msg, destnode);
1475    msg_set_destport(msg, destport);
1476
1477    if (likely(destport || destnode)) {
1478        p_ptr->sent++;
1479        if (likely(destnode == tipc_own_addr))
1480            return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1481        res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1482                           destnode);
1483        if (likely(res != -ELINKCONG))
1484            return res;
1485        if (port_unreliable(p_ptr)) {
1486            /* Just calculate msg length and return */
1487            return msg_calc_data_size(msg_sect, num_sect);
1488        }
1489        return -ELINKCONG;
1490    }
1491    return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1492                     TIPC_ERR_NO_NAME);
1493}
1494
1495/**
1496 * tipc_send2name - send message sections to port name
1497 */
1498
1499int tipc_send2name(u32 ref,
1500           struct tipc_name const *name,
1501           unsigned int domain,
1502           unsigned int num_sect,
1503           struct iovec const *msg_sect)
1504{
1505    struct tipc_portid orig;
1506
1507    orig.ref = ref;
1508    orig.node = tipc_own_addr;
1509    return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1510                 TIPC_PORT_IMPORTANCE);
1511}
1512
1513/**
1514 * tipc_forward_buf2name - forward message buffer to port name
1515 */
1516
1517int tipc_forward_buf2name(u32 ref,
1518              struct tipc_name const *name,
1519              u32 domain,
1520              struct sk_buff *buf,
1521              unsigned int dsz,
1522              struct tipc_portid const *orig,
1523              unsigned int importance)
1524{
1525    struct port *p_ptr;
1526    struct tipc_msg *msg;
1527    u32 destnode = domain;
1528    u32 destport = 0;
1529    int res;
1530
1531    p_ptr = (struct port *)tipc_ref_deref(ref);
1532    if (!p_ptr || p_ptr->publ.connected)
1533        return -EINVAL;
1534
1535    msg = &p_ptr->publ.phdr;
1536    if (importance <= TIPC_CRITICAL_IMPORTANCE)
1537        msg_set_importance(msg, importance);
1538    msg_set_type(msg, TIPC_NAMED_MSG);
1539    msg_set_orignode(msg, orig->node);
1540    msg_set_origport(msg, orig->ref);
1541    msg_set_nametype(msg, name->type);
1542    msg_set_nameinst(msg, name->instance);
1543    msg_set_lookup_scope(msg, addr_scope(domain));
1544    msg_set_hdr_sz(msg, LONG_H_SIZE);
1545    msg_set_size(msg, LONG_H_SIZE + dsz);
1546    destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1547    msg_set_destnode(msg, destnode);
1548    msg_set_destport(msg, destport);
1549    msg_dbg(msg, "forw2name ==> ");
1550    if (skb_cow(buf, LONG_H_SIZE))
1551        return -ENOMEM;
1552    skb_push(buf, LONG_H_SIZE);
1553    skb_copy_to_linear_data(buf, msg, LONG_H_SIZE);
1554    msg_dbg(buf_msg(buf),"PREP:");
1555    if (likely(destport || destnode)) {
1556        p_ptr->sent++;
1557        if (destnode == tipc_own_addr)
1558            return tipc_port_recv_msg(buf);
1559        res = tipc_send_buf_fast(buf, destnode);
1560        if (likely(res != -ELINKCONG))
1561            return res;
1562        if (port_unreliable(p_ptr))
1563            return dsz;
1564        return -ELINKCONG;
1565    }
1566    return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1567}
1568
1569/**
1570 * tipc_send_buf2name - send message buffer to port name
1571 */
1572
1573int tipc_send_buf2name(u32 ref,
1574               struct tipc_name const *dest,
1575               u32 domain,
1576               struct sk_buff *buf,
1577               unsigned int dsz)
1578{
1579    struct tipc_portid orig;
1580
1581    orig.ref = ref;
1582    orig.node = tipc_own_addr;
1583    return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1584                     TIPC_PORT_IMPORTANCE);
1585}
1586
1587/**
1588 * tipc_forward2port - forward message sections to port identity
1589 */
1590
1591int tipc_forward2port(u32 ref,
1592              struct tipc_portid const *dest,
1593              unsigned int num_sect,
1594              struct iovec const *msg_sect,
1595              struct tipc_portid const *orig,
1596              unsigned int importance)
1597{
1598    struct port *p_ptr;
1599    struct tipc_msg *msg;
1600    int res;
1601
1602    p_ptr = tipc_port_deref(ref);
1603    if (!p_ptr || p_ptr->publ.connected)
1604        return -EINVAL;
1605
1606    msg = &p_ptr->publ.phdr;
1607    msg_set_type(msg, TIPC_DIRECT_MSG);
1608    msg_set_orignode(msg, orig->node);
1609    msg_set_origport(msg, orig->ref);
1610    msg_set_destnode(msg, dest->node);
1611    msg_set_destport(msg, dest->ref);
1612    msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1613    if (importance <= TIPC_CRITICAL_IMPORTANCE)
1614        msg_set_importance(msg, importance);
1615    p_ptr->sent++;
1616    if (dest->node == tipc_own_addr)
1617        return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1618    res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1619    if (likely(res != -ELINKCONG))
1620        return res;
1621    if (port_unreliable(p_ptr)) {
1622        /* Just calculate msg length and return */
1623        return msg_calc_data_size(msg_sect, num_sect);
1624    }
1625    return -ELINKCONG;
1626}
1627
1628/**
1629 * tipc_send2port - send message sections to port identity
1630 */
1631
1632int tipc_send2port(u32 ref,
1633           struct tipc_portid const *dest,
1634           unsigned int num_sect,
1635           struct iovec const *msg_sect)
1636{
1637    struct tipc_portid orig;
1638
1639    orig.ref = ref;
1640    orig.node = tipc_own_addr;
1641    return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig,
1642                 TIPC_PORT_IMPORTANCE);
1643}
1644
1645/**
1646 * tipc_forward_buf2port - forward message buffer to port identity
1647 */
1648int tipc_forward_buf2port(u32 ref,
1649              struct tipc_portid const *dest,
1650              struct sk_buff *buf,
1651              unsigned int dsz,
1652              struct tipc_portid const *orig,
1653              unsigned int importance)
1654{
1655    struct port *p_ptr;
1656    struct tipc_msg *msg;
1657    int res;
1658
1659    p_ptr = (struct port *)tipc_ref_deref(ref);
1660    if (!p_ptr || p_ptr->publ.connected)
1661        return -EINVAL;
1662
1663    msg = &p_ptr->publ.phdr;
1664    msg_set_type(msg, TIPC_DIRECT_MSG);
1665    msg_set_orignode(msg, orig->node);
1666    msg_set_origport(msg, orig->ref);
1667    msg_set_destnode(msg, dest->node);
1668    msg_set_destport(msg, dest->ref);
1669    msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1670    if (importance <= TIPC_CRITICAL_IMPORTANCE)
1671        msg_set_importance(msg, importance);
1672    msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1673    if (skb_cow(buf, DIR_MSG_H_SIZE))
1674        return -ENOMEM;
1675
1676    skb_push(buf, DIR_MSG_H_SIZE);
1677    skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
1678    msg_dbg(msg, "buf2port: ");
1679    p_ptr->sent++;
1680    if (dest->node == tipc_own_addr)
1681        return tipc_port_recv_msg(buf);
1682    res = tipc_send_buf_fast(buf, dest->node);
1683    if (likely(res != -ELINKCONG))
1684        return res;
1685    if (port_unreliable(p_ptr))
1686        return dsz;
1687    return -ELINKCONG;
1688}
1689
1690/**
1691 * tipc_send_buf2port - send message buffer to port identity
1692 */
1693
1694int tipc_send_buf2port(u32 ref,
1695               struct tipc_portid const *dest,
1696               struct sk_buff *buf,
1697               unsigned int dsz)
1698{
1699    struct tipc_portid orig;
1700
1701    orig.ref = ref;
1702    orig.node = tipc_own_addr;
1703    return tipc_forward_buf2port(ref, dest, buf, dsz, &orig,
1704                     TIPC_PORT_IMPORTANCE);
1705}
1706
1707

Archive Download this file



interactive