WvStreams
wvsubprocqueue.cc
1/*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * A way to enqueue a series of WvSubProc objects. See wvsubprocqueue.h.
6 */
7#include "wvsubprocqueue.h"
8#include <unistd.h>
9#include <assert.h>
10
11
12WvSubProcQueue::WvSubProcQueue(unsigned _maxrunning)
13{
14 maxrunning = _maxrunning;
15}
16
17
18WvSubProcQueue::~WvSubProcQueue()
19{
20}
21
22
23void WvSubProcQueue::add(void *cookie, WvSubProc *proc)
24{
25 assert(proc);
26 assert(!proc->running);
27 if (cookie)
28 {
29 // search for other enqueued objects with this cookie
30 EntList::Iter i(waitq);
31 for (i.rewind(); i.next(); )
32 {
33 if (i->cookie == cookie)
34 {
35 // already enqueued; mark it as "redo" unless it's already
36 // the last one. That way we guarantee it'll still run
37 // in the future from now, and it'll come later than anything
38 // else in the queue, but it won't pointlessly run twice at
39 // the end.
40 Ent *e = i.ptr();
41 if (i.next())
42 e->redo = true;
43 delete proc;
44 return;
45 }
46 }
47 }
48
49 waitq.append(new Ent(cookie, proc), true);
50}
51
52
53void WvSubProcQueue::add(void *cookie,
54 const char *cmd, const char * const *argv)
55{
56 WvSubProc *p = new WvSubProc;
57 p->preparev(cmd, argv);
58 add(cookie, p);
59}
60
61
62bool WvSubProcQueue::cookie_running()
63{
64 EntList::Iter i(runq);
65 for (i.rewind(); i.next(); )
66 if (i->cookie)
67 return true;
68 return false;
69}
70
71
73{
74 int started = 0;
75
76 //fprintf(stderr, "go: %d waiting, %d running\n",
77 // waitq.count(), runq.count());
78
79 // first we need to clean up any finished processes
80 {
81 EntList::Iter i(runq);
82 for (i.rewind(); i.next(); )
83 {
84 Ent *e = i.ptr();
85
86 e->proc->wait(0, true);
87 if (!e->proc->running)
88 {
89 if (e->redo)
90 {
91 // someone re-enqueued this task while it was
92 // waiting/running
93 e->redo = false;
94 i.xunlink(false);
95 waitq.append(e, true);
96 }
97 else
98 i.xunlink();
99 }
100 }
101 }
102
103 while (!waitq.isempty() && runq.count() < maxrunning)
104 {
105 EntList::Iter i(waitq);
106 for (i.rewind(); i.next(); )
107 {
108 // elements with cookies are "sync points" in the queue;
109 // they guarantee that everything before that point has
110 // finished running before they run, and don't let anything
111 // after them run until they've finished.
112 if (i->cookie && !runq.isempty())
113 goto out;
114 if (cookie_running())
115 goto out;
116
117 // jump it into the running queue, but be careful not to
118 // delete the object when removing!
119 Ent *e = i.ptr();
120 i.xunlink(false);
121 runq.append(e, true);
122 e->proc->start_again();
123 started++;
124 break;
125 }
126 }
127
128out:
129 assert(runq.count() <= maxrunning);
130 return started;
131}
132
133
135{
136 return runq.count();
137}
138
139
141{
142 return runq.count() + waitq.count();
143}
144
145
147{
148 return runq.isempty() && waitq.isempty();
149}
150
151
153{
154 while (!isempty())
155 {
156 go();
157 if (!isempty())
158 usleep(100*1000);
159 }
160}
unsigned running() const
Return the number of currently running processes.
bool isempty() const
True if there are no unfinished (ie. running or waiting) processes.
unsigned remaining() const
Return the number of unfinished (ie. running or waiting) processes.
int go()
Clean up after any running processes in the queue, and start running additional processes if any are ...
WvSubProcQueue(unsigned _maxrunning)
Create a WvSubProcQueue.
void add(void *cookie, WvSubProc *proc)
Enqueue a process.
void finish()
Wait synchronously for all processes in the entire queue to finish.