libdap Updated for version 3.18.1
MarshallerThread.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of libdap, A C++ implementation of the OPeNDAP Data
4// Access Protocol.
5
6// Copyright (c) 2015 OPeNDAP, Inc.
7// Author: James Gallagher <jgallagher@opendap.org>
8//
9// This library is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// This library is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22//
23// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24
25/*
26 * MarshallerThread.cc
27 *
28 * Created on: Aug 27, 2015
29 * Author: jimg
30 */
31
32#include "config.h"
33
34#include <pthread.h>
35#include <sys/time.h>
36#include <fcntl.h>
37#include <unistd.h>
38
39#include <ostream>
40#include <sstream>
41
42#include "MarshallerThread.h"
43#include "Error.h"
44#include "InternalErr.h"
45#include "debug.h"
46
47using namespace libdap;
48using namespace std;
49
50#if 0
51bool MarshallerThread::print_time = false;
52
58static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
59{
60 /* Perform the carry for the later subtraction by updating y. */
61 if (stop->tv_usec < start->tv_usec) {
62 int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
63 start->tv_usec -= 1000000 * nsec;
64 start->tv_sec += nsec;
65 }
66 if (stop->tv_usec - start->tv_usec > 1000000) {
67 int nsec = (start->tv_usec - stop->tv_usec) / 1000000;
68 start->tv_usec += 1000000 * nsec;
69 start->tv_sec -= nsec;
70 }
71
72 double result = stop->tv_sec - start->tv_sec;
73 result += double(stop->tv_usec - start->tv_usec) / 1000000;
74 return result;
75}
76#endif
77
78
88Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
89 m_mutex(lock)
90{
91 int status = pthread_mutex_lock(&m_mutex);
92
93 DBG(cerr << "Locking the mutex! (waiting; " << pthread_self() << ")" << endl);
94
95 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
96 while (count != 0) {
97 status = pthread_cond_wait(&cond, &m_mutex);
98 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
99 }
100 if (count != 0) throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
101
102 DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
103}
104
109{
110 DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
111
112 int status = pthread_mutex_unlock(&m_mutex);
113 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
114}
115
116
130ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) :
131 m_mutex(lock), m_cond(cond), m_count(count)
132{
133 int status = pthread_mutex_lock(&m_mutex);
134
135 DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
136
137 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
138
139 DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
140}
141
142ChildLocker::~ChildLocker()
143{
144 DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);
145
146 m_count = 0;
147 int status = pthread_cond_signal(&m_cond);
148 if (status != 0)
149 throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");
150
151 status = pthread_mutex_unlock(&m_mutex);
152 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
153}
154
155MarshallerThread::MarshallerThread() :
156 d_thread(0), d_child_thread_count(0)
157{
158 if (pthread_attr_init(&d_thread_attr) != 0) throw Error(internal_error, "Failed to initialize pthread attributes.");
159 if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
160 throw Error(internal_error, "Failed to complete pthread attribute initialization.");
161
162 if (pthread_mutex_init(&d_out_mutex, 0) != 0) throw Error(internal_error, "Failed to initialize mutex.");
163 if (pthread_cond_init(&d_out_cond, 0) != 0) throw Error(internal_error, "Failed to initialize cond.");
164}
165
166MarshallerThread::~MarshallerThread()
167{
168 int status = pthread_mutex_lock(&d_out_mutex);
169 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
170 while (d_child_thread_count != 0) {
171 status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
172 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
173 }
174 if (d_child_thread_count != 0)
175 throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");
176
177 status = pthread_mutex_unlock(&d_out_mutex);
178 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
179
180 pthread_mutex_destroy(&d_out_mutex);
181 pthread_cond_destroy(&d_out_cond);
182
183 pthread_attr_destroy(&d_thread_attr);
184}
185
186// not a static method
192void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf,
193 unsigned int bytes)
194{
195 write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf,
196 bytes);
197 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
198 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
199}
200
204void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes)
205{
206 write_args *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf,
207 bytes);
208 int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
209 if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
210}
211
221void *
223{
224 write_args *args = reinterpret_cast<write_args *>(arg);
225
226 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
227
228#if 0
229 struct timeval tp_s;
230 if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
231#endif
232
233 // force an error
234 // return (void*)-1;
235
236 if (args->d_out_file != -1) {
237 int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
238 if (bytes_written != args->d_num)
239 return (void*) -1;
240 }
241 else {
242 args->d_out.write(args->d_buf, args->d_num);
243 if (args->d_out.fail()) {
244 ostringstream oss;
245 oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
246 args->d_error = oss.str();
247 return (void*) -1;
248 }
249 }
250
251 delete [] args->d_buf;
252 delete args;
253
254#if 0
255 struct timeval tp_e;
256 if (print_time) {
257 if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
258
259 cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
260 }
261#endif
262
263 return 0;
264}
265
278void *
280{
281 write_args *args = reinterpret_cast<write_args *>(arg);
282
283 ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit
284
285 if (args->d_out_file != -1) {
286 int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
287 if (bytes_written != args->d_num) return (void*) -1;
288 }
289 else {
290 args->d_out.write(args->d_buf + 4, args->d_num);
291 if (args->d_out.fail()) {
292 ostringstream oss;
293 oss << "Could not write data: " << __FILE__ << ":" << __LINE__;
294 args->d_error = oss.str();
295 return (void*) -1;
296 }
297 }
298
299 delete [] args->d_buf;
300 delete args;
301
302 return 0;
303}
304
A class for error processing.
Definition: Error.h:91
A class for software fault reporting.
Definition: InternalErr.h:65
static void * write_thread(void *arg)
static void * write_thread_part(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)