comparison freeDiameter/p_out.c @ 34:0e2b57789361

Backup for the WE, some warnings remaining
author Sebastien Decugis <sdecugis@nict.go.jp>
date Fri, 30 Oct 2009 17:23:06 +0900
parents e6fcdf12b9a0
children 6486e97f56ae
comparison
equal deleted inserted replaced
33:e6fcdf12b9a0 34:0e2b57789361
34 *********************************************************************************************************/ 34 *********************************************************************************************************/
35 35
36 #include "fD.h" 36 #include "fD.h"
37 37
38 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */ 38 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
39 static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct fd_list * sentreq) 39 static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
40 { 40 {
41 TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, sentreq); 41 struct msg_hdr * hdr;
42 int msg_is_a_req;
43 uint8_t * buf;
44 size_t sz;
45 int ret;
42 46
43 TODO("If message is a request"); 47 TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl);
44 TODO("Alloc new *hbh");
45 48
46 TODO("Bufferize the message, send it"); 49 /* Retrieve the message header */
50 CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
47 51
48 TODO("Save in sentreq or free") 52 msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST);
53 if (msg_is_a_req) {
54 CHECK_PARAMS(hbh && srl);
55 /* Alloc the hop-by-hop id and increment the value for next message */
56 hdr->msg_hbhid = *hbh;
57 *hbh = hdr->msg_hbhid + 1;
58 }
49 59
50 return ENOTSUP; 60 /* Create the message buffer */
61 CHECK_FCT(fd_msg_bufferize( *msg, &buf, &sz ));
62
63 /* Send the message */
64 pthread_cleanup_push( free, buf );
65 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), { free(buf); return ret; } );
66 pthread_cleanup_pop(1);
67
68 /* Save a request */
69 if (msg_is_a_req) {
70 CHECK_FCT_DO( fd_p_sr_store(srl, msg, &hdr->msg_hbhid),
71 {
72 fd_log_debug("The following request was sent successfully but not saved locally:\n" );
73 fd_log_debug(" (as a result the matching answer will be discarded)\n" );
74 fd_msg_dump_walk(NONE, *msg);
75 } );
76
77 }
78
79 /* Free answers and unsaved requests */
80 if (*msg) {
81 CHECK_FCT( fd_msg_free(*msg) );
82 *msg = NULL;
83 }
84
85 return 0;
86 }
87
88 static void cleanup_requeue(void * arg)
89 {
90 struct msg *msg = arg;
91 CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
92 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */));
51 } 93 }
52 94
53 /* The code of the "out" thread */ 95 /* The code of the "out" thread */
54 static void * out_thr(void * arg) 96 static void * out_thr(void * arg)
55 { 97 {
56 TODO("Pick next message in peer->p_tosend"); 98 struct fd_peer * peer = arg;
57 TODO("do_send, log errors"); 99 ASSERT( CHECK_PEER(peer) );
58 TODO("In case of cancellation, requeue the message"); 100
59 return NULL; 101 /* Set the thread name */
102 {
103 char buf[48];
104 sprintf(buf, "OUT/%.*s", sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
105 fd_log_threadname ( buf );
106 }
107
108 /* Loop until cancelation */
109 while (1) {
110 struct msg * msg;
111
112 /* Retrieve next message to send */
113 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
114
115 /* Now if we are cancelled, we requeue this message */
116 pthread_cleanup_push(cleanup_requeue, msg);
117
118 /* Send the message, log any error */
119 CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
120 {
121 fd_log_debug("An error occurred while sending this message, it is lost:\n");
122 fd_msg_dump_walk(NONE, msg);
123 fd_msg_free(msg);
124 } );
125
126 /* Loop */
127 pthread_cleanup_pop(0);
128 }
129
60 error: 130 error:
61 TODO(" Send an event to the peer "); 131 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
132 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
62 return NULL; 133 return NULL;
63 } 134 }
64 135
65 /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */ 136 /* Wrapper to sending a message either by out thread (peer in OPEN state) or directly; cnx or peer must be provided */
66 int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer) 137 int fd_out_send(struct msg ** msg, struct cnxctx * cnx, struct fd_peer * peer)
81 152
82 if (!cnx) 153 if (!cnx)
83 cnx = peer->p_cnxctx; 154 cnx = peer->p_cnxctx;
84 155
85 /* Do send the message */ 156 /* Do send the message */
86 CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sentreq : NULL) ); 157 CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL) );
87 } 158 }
88 159
89 return 0; 160 return 0;
90 } 161 }
91 162
"Welcome to our mercurial repository"