WvStreams
wvdbusconn.cc
1/* -*- Mode: C++ -*-
2 * Worldvisions Weaver Software:
3 * Copyright (C) 2004-2006 Net Integration Technologies, Inc.
4 *
5 * Pathfinder Software:
6 * Copyright (C) 2007, Carillon Information Security Inc.
7 *
8 * This library is licensed under the LGPL, please read LICENSE for details.
9 *
10 */
11#include "wvdbusconn.h"
12#include "wvmoniker.h"
13#include "wvstrutils.h"
14#undef interface // windows
15#include <dbus/dbus.h>
16
17
18static WvString translate(WvStringParm dbus_moniker)
19{
21 WvStringList::Iter i(l);
22
23 if (!strncasecmp(dbus_moniker, "unix:", 5))
24 {
25 WvString path, tmpdir;
26 l.split(dbus_moniker+5, ",");
27 for (i.rewind(); i.next(); )
28 {
29 if (!strncasecmp(*i, "path=", 5))
30 path = *i + 5;
31 else if (!strncasecmp(*i, "abstract=", 9))
32 path = WvString("@%s", *i + 9);
33 else if (!strncasecmp(*i, "tmpdir=", 7))
34 tmpdir = *i + 7;
35 }
36 if (!!path)
37 return WvString("unix:%s", path);
38 else if (!!tmpdir)
39 return WvString("unix:%s/dbus.sock", tmpdir);
40 }
41 else if (!strncasecmp(dbus_moniker, "tcp:", 4))
42 {
43 WvString host, port, family;
44 l.split(dbus_moniker+4, ",");
45 for (i.rewind(); i.next(); )
46 {
47 if (!strncasecmp(*i, "family=", 7))
48 family = *i + 7;
49 else if (!strncasecmp(*i, "host=", 5))
50 host = *i + 5;
51 else if (!strncasecmp(*i, "port=", 5))
52 port = *i + 5;
53 }
54 if (!!host && !!port)
55 return WvString("tcp:%s:%s", host, port);
56 else if (!!host)
57 return WvString("tcp:%s", host);
58 else if (!!port)
59 return WvString("tcp:0.0.0.0:%s", port); // localhost
60 }
61
62 return dbus_moniker; // unrecognized
63}
64
65
66static IWvStream *stream_creator(WvStringParm _s, IObject *)
67{
68 WvString s(_s);
69
70 if (!strcasecmp(s, "starter"))
71 {
72 WvString startbus(getenv("DBUS_STARTER_ADDRESS"));
73 if (!!startbus)
74 return IWvStream::create(translate(startbus));
75 else
76 {
77 WvString starttype(getenv("DBUS_STARTER_BUS_TYPE"));
78 if (!!starttype && !strcasecmp(starttype, "system"))
79 s = "system";
80 else if (!!starttype && !strcasecmp(starttype, "session"))
81 s = "session";
82 }
83 }
84
85 if (!strcasecmp(s, "system"))
86 {
87 // NOTE: the environment variable for the address of the system
88 // bus is very often not set-- in that case, look in your dbus
89 // system bus config file (e.g. /etc/dbus-1/system.conf) for the
90 // raw address and either set this environment variable to that, or
91 // pass in the address directly
92 WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS"));
93 if (!!bus)
94 return IWvStream::create(translate(bus));
95 }
96
97 if (!strcasecmp(s, "session"))
98 {
99 WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS"));
100 if (!!bus)
101 return IWvStream::create(translate(bus));
102 }
103
104 return IWvStream::create(translate(s));
105}
106
107static WvMoniker<IWvStream> reg("dbus", stream_creator);
108
109
110static int conncount;
111
112WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client)
113 : WvStreamClone(_cloned),
114 log(WvString("DBus %s%s",
115 _client ? "" : "s",
116 ++conncount), WvLog::Debug5),
117 pending(10)
118{
119 init(_auth, _client);
120}
121
122
123WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client)
124 : WvStreamClone(IWvStream::create(moniker)),
125 log(WvString("DBus %s%s",
126 _client ? "" : "s",
127 ++conncount), WvLog::Debug5),
128 pending(10)
129{
130 log("Connecting to '%s'\n", moniker);
131 init(_auth, _client);
132}
133
134
135void WvDBusConn::init(IWvDBusAuth *_auth, bool _client)
136{
137 log("Initializing.\n");
138 client = _client;
139 auth = _auth ? _auth : new WvDBusClientAuth;
140 authorized = in_post_select = false;
141 if (!client) set_uniquename(WvString(":%s.0", conncount));
142
143 if (!isok()) return;
144
145 delay_output(true);
146
147 // this will get enqueued until later, but we want to make sure it
148 // comes before anything the user tries to send - including anything
149 // goofy they enqueue in the authorization part.
150 if (client)
151 send_hello();
152
153 try_auth();
154}
155
157{
158 log("Shutting down.\n");
159 if (geterr())
160 log("Error was: %s\n", errstr());
161
162 close();
163
164 delete auth;
165}
166
167
169{
170 if (!closed)
171 log("Closing.\n");
173}
174
175
177{
178 return _uniquename;
179}
180
181
182void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply,
183 time_t msec_timeout)
184{
185 uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT |
186 DBUS_NAME_FLAG_REPLACE_EXISTING);
187 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
188 "org.freedesktop.DBus", "RequestName");
189 msg.append(name).append(flags);
190 send(msg, onreply, msec_timeout);
191}
192
193
195{
196 msg.marshal(out_queue);
197 if (authorized)
198 {
199 log(" >> %s\n", msg);
200 write(out_queue);
201 }
202 else
203 log(" .> %s\n", msg);
204 return msg.get_serial();
205}
206
207
208void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply,
209 time_t msec_timeout)
210{
211 send(msg);
212 if (onreply)
213 add_pending(msg, onreply, msec_timeout);
214}
215
216
218{
219public:
220 WvDBusMsg *reply;
221
223 { reply = NULL; }
225 { delete reply; }
226 bool reply_wait(WvDBusMsg &msg)
227 { reply = new WvDBusMsg(msg); return true; }
228};
229
230
232 wv::function<void(uint32_t)> serial_cb)
233{
234 xxReplyWaiter rw;
235
236 send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1),
237 msec_timeout);
238 if (serial_cb)
239 serial_cb(msg.get_serial());
240 while (!rw.reply && isok())
241 runonce();
242 if (!rw.reply)
243 return WvDBusError(msg, DBUS_ERROR_FAILED,
244 WvString("Connection closed (%s) "
245 "while waiting for reply.",
246 errstr()));
247 else
248 return *rw.reply;
249}
250
251
252void WvDBusConn::out(WvStringParm s)
253{
254 log(" >> %s", s);
255 print(s);
256}
257
258
259const char *WvDBusConn::in()
260{
261 const char *s = trim_string(getline(0));
262 if (s)
263 log("<< %s\n", s);
264 return s;
265}
266
267
268void WvDBusConn::send_hello()
269{
270 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
271 "org.freedesktop.DBus", "Hello");
272 send(msg, wv::bind(&WvDBusConn::_registered, this, _1));
273 WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus",
274 "org.freedesktop.DBus", "AddMatch");
275 msg2.append("type='signal'");
276 send(msg2); // don't need to monitor this for completion
277}
278
279
280void WvDBusConn::set_uniquename(WvStringParm s)
281{
282 // we want to print the message before switching log.app, so that we
283 // can trace which log.app turned into which
284 log("Assigned name '%s'\n", s);
285 _uniquename = s;
286 log.app = WvString("DBus %s%s", client ? "" : "s", uniquename());
287}
288
289
290void WvDBusConn::try_auth()
291{
292 bool done = auth->authorize(*this);
293 if (done)
294 {
295 // ready to send messages!
296 if (out_queue.used())
297 {
298 log(" >> (sending enqueued messages)\n");
299 write(out_queue);
300 }
301
302 authorized = true;
303 }
304}
305
306
307void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie)
308{
309 callbacks.append(new CallbackInfo(pri, cb, cookie), true);
310}
311
312
313void WvDBusConn::del_callback(void *cookie)
314{
315 // remember, there might be more than one callback with the same cookie.
316 CallbackInfoList::Iter i(callbacks);
317 for (i.rewind(); i.next(); )
318 if (i->cookie == cookie)
319 i.xunlink();
320}
321
322
323int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b)
324{
325 return a->pri - b->pri;
326}
327
329{
330 log("<< %s\n", msg);
331
332 // handle replies
333 uint32_t rserial = msg.get_replyserial();
334 if (rserial)
335 {
336 Pending *p = pending[rserial];
337 if (p)
338 {
339 p->cb(msg);
340 pending.remove(p);
341 return true; // handled it
342 }
343 }
344
345 // handle all the generic filters
346 CallbackInfoList::Sorter i(callbacks, priority_order);
347 for (i.rewind(); i.next(); )
348 {
349 bool handled = i->cb(msg);
350 if (handled) return true;
351 }
352
353 return false; // couldn't handle the message, sorry
354}
355
356
357WvDBusClientAuth::WvDBusClientAuth()
358{
359 sent_request = false;
360}
361
362
363wvuid_t WvDBusClientAuth::get_uid()
364{
365 return wvgetuid();
366}
367
368
370{
371 if (!sent_request)
372 {
373 c.write("\0", 1);
374 WvString uid = get_uid();
375 c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid));
376 sent_request = true;
377 }
378 else
379 {
380 const char *line = c.in();
381 if (line)
382 {
383 if (!strncasecmp(line, "OK ", 3))
384 {
385 c.out("BEGIN\r\n");
386 return true;
387 }
388 else if (!strncasecmp(line, "ERROR ", 6))
389 c.seterr("Auth failed: %s", line);
390 else
391 c.seterr("Unknown AUTH response: '%s'", line);
392 }
393 }
394
395 return false;
396}
397
398
399time_t WvDBusConn::mintimeout_msec()
400{
401 WvTime when = 0;
402 PendingDict::Iter i(pending);
403 for (i.rewind(); i.next(); )
404 {
405 if (!when || when > i->valid_until)
406 when = i->valid_until;
407 }
408 if (!when)
409 return -1;
410 else if (when <= wvstime())
411 return 0;
412 else
413 return msecdiff(when, wvstime());
414}
415
416
417bool WvDBusConn::post_select(SelectInfo &si)
418{
419 bool ready = WvStreamClone::post_select(si);
420 if (si.inherit_request) return ready;
421
422 if (in_post_select) return false;
423 in_post_select = true;
424
425 if (!authorized && ready)
426 try_auth();
427
428 if (!alarm_remaining())
429 {
430 WvTime now = wvstime();
431 PendingDict::Iter i(pending);
432 for (i.rewind(); i.next(); )
433 {
434 if (now > i->valid_until)
435 {
436 log("Expiring %s\n", i->msg);
437 expire_pending(i.ptr());
438 i.rewind();
439 }
440 }
441 }
442
443 if (authorized && ready)
444 {
445 // put this in a loop so that wvdbusd can forward packets rapidly.
446 // Otherwise TCP_NODELAY kicks in, because we do a select() loop
447 // between packets, which causes delay_output() to flush.
448 bool ran;
449 do
450 {
451 ran = false;
452 size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue);
453 size_t amt = needed - in_queue.used();
454 if (amt < 4096)
455 amt = 4096;
456 read(in_queue, amt);
457 WvDBusMsg *m;
458 while ((m = WvDBusMsg::demarshal(in_queue)) != NULL)
459 {
460 ran = true;
461 filter_func(*m);
462 delete m;
463 }
464 } while (ran);
465 }
466
467 alarm(mintimeout_msec());
468 in_post_select = false;
469 return false;
470}
471
472
474{
475 return !out_queue.used() && pending.isempty();
476}
477
478
479void WvDBusConn::expire_pending(Pending *p)
480{
481 if (p)
482 {
483 WvDBusCallback xcb(p->cb);
484 pending.remove(p); // prevent accidental recursion
485 WvDBusError e(p->msg, DBUS_ERROR_FAILED,
486 "Timed out while waiting for reply");
487 xcb(e);
488 }
489}
490
491
492void WvDBusConn::cancel_pending(uint32_t serial)
493{
494 Pending *p = pending[serial];
495 if (p)
496 {
497 WvDBusCallback xcb(p->cb);
498 WvDBusMsg msg(p->msg);
499 pending.remove(p); // prevent accidental recursion
500 WvDBusError e(msg, DBUS_ERROR_FAILED,
501 "Canceled while waiting for reply");
502 xcb(e);
503 }
504}
505
506
507void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb,
508 time_t msec_timeout)
509{
510 uint32_t serial = msg.get_serial();
511 assert(serial);
512 if (pending[serial])
513 cancel_pending(serial);
514 pending.add(new Pending(msg, cb, msec_timeout), true);
515 alarm(mintimeout_msec());
516}
517
518
519bool WvDBusConn::_registered(WvDBusMsg &msg)
520{
521 WvDBusMsg::Iter i(msg);
522 _uniquename = i.getnext().get_str();
523 set_uniquename(_uniquename);
524 return true;
525}
526
The basic interface which is included by all other XPLC interfaces and objects.
Definition IObject.h:65
virtual bool authorize(WvDBusConn &c)=0
Main action callback.
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition wvbufbase.h:92
virtual bool authorize(WvDBusConn &c)
Main action callback.
virtual void close()
Close the underlying stream.
virtual bool filter_func(WvDBusMsg &msg)
Called by for each received message.
virtual ~WvDBusConn()
Release this connection.
WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth=NULL, bool _client=true)
Creates a new dbus connection using the given WvStreams moniker.
CallbackPri
The priority level of a callback registration.
Definition wvdbusconn.h:170
WvDBusMsg send_and_wait(WvDBusMsg msg, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT, wv::function< void(uint32_t)> serial_cb=0)
Send a message on the bus and wait for a reply to come in, returning the message when it does.
void request_name(WvStringParm name, const WvDBusCallback &onreply=0, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT)
Request the given service name on DBus.
bool isidle()
Returns true if there are no outstanding messages that have not received (or timed out) their reply.
uint32_t send(WvDBusMsg msg)
Send a message on the bus, not expecting any reply.
void add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie=NULL)
Adds a callback to the connection: all received messages will be sent to all callbacks to look at and...
void del_callback(void *cookie)
Delete all callbacks that have the given cookie.
WvString uniquename() const
Return this connection's unique name on the bus, assigned by the server at connect time.
WvDBusMsg & append(const char *s)
The following methods are designed to allow appending various arguments to the message.
Definition wvdbusmsg.cc:461
static WvDBusMsg * demarshal(WvBuf &buf)
Demarshals a new WvDBusMsg from a buffer containing its binary DBus protocol representation.
static size_t demarshal_bytes_needed(WvBuf &buf)
Given a buffer containing what might be the header of a DBus message, checks how many bytes need to b...
void marshal(WvBuf &buf)
Locks this message, encodes it in DBus binary protocol format, and adds it to the given buffer.
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition wvstring.h:94
A hex encoder.
Definition wvhex.h:22
A WvLog stream accepts log messages from applications and forwards them to all registered WvLogRcv's.
Definition wvlog.h:57
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Definition wvmoniker.h:62
WvStreamClone simply forwards all requests to the "cloned" stream.
virtual void close()
Close this stream.
virtual bool isok() const
return true if the stream is actually usable right now
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
void delay_output(bool is_delayed)
force write() to always buffer output.
Definition wvstream.h:246
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition wvstream.cc:1049
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition wvstream.cc:532
void runonce(time_t msec_timeout=-1)
Exactly the same as: if (select(timeout)) callback();.
Definition wvstream.h:391
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition wvstream.h:175
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition wvstream.cc:490
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition wvstream.cc:1058
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition wvstream.cc:451
This is a WvList of WvStrings, and is a really handy way to parse strings.
void split(WvStringParm s, const char *splitchars=" \t\r\n", int limit=0)
split s and form a list ignoring splitchars (except at beginning and end) ie.
WvString is an implementation of a simple and efficient printable-string class.
Definition wvstring.h:330
Based on (and interchangeable with) struct timeval.
Definition wvtimeutils.h:18
Various little string functions.
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition strutils.cc:59