libdap Updated for version 3.18.1
XDRStreamMarshaller.cc
1// XDRStreamMarshaller.cc
2
3// -*- mode: c++; c-basic-offset:4 -*-
4
5// This file is part of libdap, A C++ implementation of the OPeNDAP Data
6// Access Protocol.
7
8// Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
9// Author: Patrick West <pwest@ucar.edu>
10// James Gallagher <jgallagher@opendap.org>
11//
12// This library is free software; you can redistribute it and/or
13// modify it under the terms of the GNU Lesser General Public
14// License as published by the Free Software Foundation; either
15// version 2.1 of the License, or (at your option) any later version.
16//
17// This library is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20// Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public
23// License along with this library; if not, write to the Free Software
24// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25//
26// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
27
28// (c) COPYRIGHT URI/MIT 1994-1999
29// Please read the full copyright statement in the file COPYRIGHT_URI.
30//
31// Authors:
32// pwest Patrick West <pwest@ucar.edu>
33
34
35#include "config.h"
36
37#ifdef HAVE_PTHREAD_H
38#include <pthread.h>
39#endif
40
41#include <cassert>
42
43#include <iostream>
44#include <sstream>
45#include <iomanip>
46
47// #define DODS_DEBUG
48
49#include "XDRStreamMarshaller.h"
50#ifdef USE_POSIX_THREADS
51#include "MarshallerThread.h"
52#endif
53#include "Vector.h"
54#include "XDRUtils.h"
55#include "util.h"
56
57#include "debug.h"
58
59using namespace std;
60
61// Build this code so it does not use pthreads to write some kinds of
62// data (see the put_vector() and put_vector_part() methods) in a child thread.
63// #undef USE_POSIX_THREADS
64
65namespace libdap {
66
67char *XDRStreamMarshaller::d_buf = 0;
68static const int XDR_DAP_BUFF_SIZE=256;
69
70
79XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
80 d_out(out), d_partial_put_byte_count(0), tm(0)
81{
82 if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
83 if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
84
85 xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
86
87#ifdef USE_POSIX_THREADS
88 tm = new MarshallerThread;
89#endif
90}
91
92XDRStreamMarshaller::~XDRStreamMarshaller()
93{
94 delete tm;
95
96 xdr_destroy(&d_sink);
97}
98
99void XDRStreamMarshaller::put_byte(dods_byte val)
100{
101 if (!xdr_setpos(&d_sink, 0))
102 throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
103
104 if (!xdr_char(&d_sink, (char *) &val))
105 throw Error(
106 "Network I/O Error. Could not send byte data.");
107
108 unsigned int bytes_written = xdr_getpos(&d_sink);
109 if (!bytes_written)
110 throw Error(
111 "Network I/O Error. Could not send byte data - unable to get stream position.");
112
113#ifdef USE_POSIX_THREADS
114 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
115#endif
116
117 d_out.write(d_buf, bytes_written);
118}
119
120void XDRStreamMarshaller::put_int16(dods_int16 val)
121{
122 if (!xdr_setpos(&d_sink, 0))
123 throw Error(
124 "Network I/O Error. Could not send int 16 data - unable to set stream position.");
125
126 if (!XDR_INT16(&d_sink, &val))
127 throw Error(
128 "Network I/O Error. Could not send int 16 data.");
129
130 unsigned int bytes_written = xdr_getpos(&d_sink);
131 if (!bytes_written)
132 throw Error(
133 "Network I/O Error. Could not send int 16 data - unable to get stream position.");
134
135#ifdef USE_POSIX_THREADS
136 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
137#endif
138
139 d_out.write(d_buf, bytes_written);
140}
141
142void XDRStreamMarshaller::put_int32(dods_int32 val)
143{
144 if (!xdr_setpos(&d_sink, 0))
145 throw Error(
146 "Network I/O Error. Could not send int 32 data - unable to set stream position.");
147
148 if (!XDR_INT32(&d_sink, &val))
149 throw Error(
150 "Network I/O Error. Culd not read int 32 data.");
151
152 unsigned int bytes_written = xdr_getpos(&d_sink);
153 if (!bytes_written)
154 throw Error(
155 "Network I/O Error. Could not send int 32 data - unable to get stream position.");
156
157#ifdef USE_POSIX_THREADS
158 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
159#endif
160
161 d_out.write(d_buf, bytes_written);
162}
163
164void XDRStreamMarshaller::put_float32(dods_float32 val)
165{
166 if (!xdr_setpos(&d_sink, 0))
167 throw Error(
168 "Network I/O Error. Could not send float 32 data - unable to set stream position.");
169
170 if (!xdr_float(&d_sink, &val))
171 throw Error(
172 "Network I/O Error. Could not send float 32 data.");
173
174 unsigned int bytes_written = xdr_getpos(&d_sink);
175 if (!bytes_written)
176 throw Error(
177 "Network I/O Error. Could not send float 32 data - unable to get stream position.");
178
179#ifdef USE_POSIX_THREADS
180 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
181#endif
182
183 d_out.write(d_buf, bytes_written);
184}
185
186void XDRStreamMarshaller::put_float64(dods_float64 val)
187{
188 if (!xdr_setpos(&d_sink, 0))
189 throw Error(
190 "Network I/O Error. Could not send float 64 data - unable to set stream position.");
191
192 if (!xdr_double(&d_sink, &val))
193 throw Error(
194 "Network I/O Error. Could not send float 64 data.");
195
196 unsigned int bytes_written = xdr_getpos(&d_sink);
197 if (!bytes_written)
198 throw Error(
199 "Network I/O Error. Could not send float 64 data - unable to get stream position.");
200
201#ifdef USE_POSIX_THREADS
202 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
203#endif
204
205 d_out.write(d_buf, bytes_written);
206}
207
208void XDRStreamMarshaller::put_uint16(dods_uint16 val)
209{
210 if (!xdr_setpos(&d_sink, 0))
211 throw Error(
212 "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
213
214 if (!XDR_UINT16(&d_sink, &val))
215 throw Error(
216 "Network I/O Error. Could not send uint 16 data.");
217
218 unsigned int bytes_written = xdr_getpos(&d_sink);
219 if (!bytes_written)
220 throw Error(
221 "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
222
223#ifdef USE_POSIX_THREADS
224 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
225#endif
226
227 d_out.write(d_buf, bytes_written);
228}
229
230void XDRStreamMarshaller::put_uint32(dods_uint32 val)
231{
232 if (!xdr_setpos(&d_sink, 0))
233 throw Error(
234 "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
235
236 if (!XDR_UINT32(&d_sink, &val))
237 throw Error(
238 "Network I/O Error. Could not send uint 32 data.");
239
240 unsigned int bytes_written = xdr_getpos(&d_sink);
241 if (!bytes_written)
242 throw Error(
243 "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
244
245#ifdef USE_POSIX_THREADS
246 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
247#endif
248
249 d_out.write(d_buf, bytes_written);
250}
251
252void XDRStreamMarshaller::put_str(const string &val)
253{
254 int size = val.length() + 8;
255
256 XDR str_sink;
257 vector<char> str_buf(size);
258
259 try {
260 xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
261
262 if (!xdr_setpos(&str_sink, 0))
263 throw Error(
264 "Network I/O Error. Could not send string data - unable to set stream position.");
265
266 const char *out_tmp = val.c_str();
267 if (!xdr_string(&str_sink, (char **) &out_tmp, size))
268 throw Error(
269 "Network I/O Error. Could not send string data.");
270
271 unsigned int bytes_written = xdr_getpos(&str_sink);
272 if (!bytes_written)
273 throw Error(
274 "Network I/O Error. Could not send string data - unable to get stream position.");
275
276#ifdef USE_POSIX_THREADS
277 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
278#endif
279
280 d_out.write(&str_buf[0], bytes_written);
281
282 xdr_destroy(&str_sink);
283 }
284 catch (...) {
285 xdr_destroy(&str_sink);
286 throw;
287 }
288}
289
290void XDRStreamMarshaller::put_url(const string &val)
291{
292 put_str(val);
293}
294
295void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
296{
297 if (len > XDR_DAP_BUFF_SIZE)
298 throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
299
300 if (!xdr_setpos(&d_sink, 0))
301 throw Error(
302 "Network I/O Error. Could not send opaque data - unable to set stream position.");
303
304 if (!xdr_opaque(&d_sink, val, len))
305 throw Error(
306 "Network I/O Error. Could not send opaque data.");
307
308 unsigned int bytes_written = xdr_getpos(&d_sink);
309 if (!bytes_written)
310 throw Error(
311 "Network I/O Error. Could not send opaque data - unable to get stream position.");
312
313#ifdef USE_POSIX_THREADS
314 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
315#endif
316
317 d_out.write(d_buf, bytes_written);
318}
319
320void XDRStreamMarshaller::put_int(int val)
321{
322 if (!xdr_setpos(&d_sink, 0))
323 throw Error(
324 "Network I/O Error. Could not send int data - unable to set stream position.");
325
326 if (!xdr_int(&d_sink, &val))
327 throw Error(
328 "Network I/O Error(1). Could not send int data.");
329
330 unsigned int bytes_written = xdr_getpos(&d_sink);
331 if (!bytes_written)
332 throw Error(
333 "Network I/O Error. Could not send int data - unable to get stream position.");
334
335#ifdef USE_POSIX_THREADS
336 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
337#endif
338
339 d_out.write(d_buf, bytes_written);
340}
341
342void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
343{
344 put_vector(val, num, width, vec.var()->type());
345}
346
347
356{
357 put_int(num);
358 put_int(num);
359
360 d_partial_put_byte_count = 0;
361}
362
370{
371#ifdef USE_POSIX_THREADS
372 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
373#endif
374
375 // Compute the trailing (padding) bytes
376
377 // Note that the XDR standard pads values to 4 byte boundaries.
378 //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
379 unsigned int mod_4 = d_partial_put_byte_count & 0x03;
380 unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
381
382 if (pad) {
383 vector<char> padding(4, 0); // 4 zeros
384
385 d_out.write(&padding[0], pad);
386 if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
387 }
388}
389
390// Start of parallel I/O support. jhrg 8/19/15
391void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
392{
393 if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
394
395 // write the number of members of the array being written and then set the position to 0
396 put_int(num);
397
398 // this is the word boundary for writing xdr bytes in a vector.
399 const unsigned int add_to = 8;
400 // switch to memory on the heap since the thread will need to access it
401 // after this code returns.
402 char *byte_buf = new char[num + add_to];
403 XDR byte_sink;
404 try {
405 xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
406 if (!xdr_setpos(&byte_sink, 0))
407 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
408
409 if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
410 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
411
412 unsigned int bytes_written = xdr_getpos(&byte_sink);
413 if (!bytes_written)
414 throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
415
416#ifdef USE_POSIX_THREADS
417 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
418 tm->increment_child_thread_count();
419 tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
420 xdr_destroy(&byte_sink);
421#else
422 d_out.write(byte_buf, bytes_written);
423 xdr_destroy(&byte_sink);
424 delete [] byte_buf;
425#endif
426
427 }
428 catch (...) {
429 DBG(cerr << "Caught an exception in put_vector_thread" << endl);
430 xdr_destroy(&byte_sink);
431 delete [] byte_buf;
432 throw;
433 }
434}
435
436// private
447void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
448{
449 assert(val || num == 0);
450
451 // write the number of array members being written, then set the position back to 0
452 put_int(num);
453
454 if (num == 0)
455 return;
456
457 int use_width = width;
458 if (use_width < 4) use_width = 4;
459
460 // the size is the number of elements num times the width of each
461 // element, then add 4 bytes for the number of elements
462 int size = (num * use_width) + 4;
463
464 // allocate enough memory for the elements
465 //vector<char> vec_buf(size);
466 char *vec_buf = new char[size];
467 XDR vec_sink;
468 try {
469 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
470
471 // set the position of the sink to 0, we're starting at the beginning
472 if (!xdr_setpos(&vec_sink, 0))
473 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
474
475 // write the array to the buffer
476 if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
477 throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
478
479 // how much was written to the buffer
480 unsigned int bytes_written = xdr_getpos(&vec_sink);
481 if (!bytes_written)
482 throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
483
484#ifdef USE_POSIX_THREADS
485 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
486 tm->increment_child_thread_count();
487 tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
488 xdr_destroy(&vec_sink);
489#else
490 d_out.write(vec_buf, bytes_written);
491 xdr_destroy(&vec_sink);
492 delete [] vec_buf;
493#endif
494 }
495 catch (...) {
496 xdr_destroy(&vec_sink);
497 delete [] vec_buf;
498 throw;
499 }
500}
501
513void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
514{
515 if (width == 1) {
516 // Add space for the 4 bytes of length info and 4 bytes for padding, even though
517 // we will not send either of those.
518 const unsigned int add_to = 8;
519 unsigned int bufsiz = num + add_to;
520 //vector<char> byte_buf(bufsiz);
521 char *byte_buf = new char[bufsiz];
522 XDR byte_sink;
523 try {
524 xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
525 if (!xdr_setpos(&byte_sink, 0))
526 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
527
528 if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
529 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
530
531#ifdef USE_POSIX_THREADS
532 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
533 tm->increment_child_thread_count();
534
535 // Increment the element count so we can figure out about the padding in put_vector_last()
536 d_partial_put_byte_count += num;
537
538 tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
539 xdr_destroy(&byte_sink);
540#else
541 // Only send the num bytes that follow the 4 bytes of length info - we skip the
542 // length info because it's already been sent and we don't send any trailing padding
543 // bytes in this method (see put_vector_last() for that).
544 d_out.write(byte_buf + 4, num);
545
546 if (d_out.fail())
547 throw Error ("Network I/O Error. Could not send initial part of byte vector data");
548
549 // Now increment the element count so we can figure out about the padding in put_vector_last()
550 d_partial_put_byte_count += num;
551
552 xdr_destroy(&byte_sink);
553 delete [] byte_buf;
554#endif
555 }
556 catch (...) {
557 xdr_destroy(&byte_sink);
558 delete [] byte_buf;
559 throw;
560 }
561 }
562 else {
563 int use_width = (width < 4) ? 4 : width;
564
565 // the size is the number of elements num times the width of each
566 // element, then add 4 bytes for the (int) number of elements
567 int size = (num * use_width) + 4;
568
569 // allocate enough memory for the elements
570 //vector<char> vec_buf(size);
571 char *vec_buf = new char[size];
572 XDR vec_sink;
573 try {
574 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
575
576 // set the position of the sink to 0, we're starting at the beginning
577 if (!xdr_setpos(&vec_sink, 0))
578 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
579
580 // write the array to the buffer
581 if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
582 throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
583
584#ifdef USE_POSIX_THREADS
585 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
586 tm->increment_child_thread_count();
587
588 // Increment the element count so we can figure out about the padding in put_vector_last()
589 d_partial_put_byte_count += (size - 4);
590 tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
591 xdr_destroy(&vec_sink);
592#else
593 // write that much out to the output stream, skipping the length data that
594 // XDR writes since we have already written the length info using put_vector_start()
595 d_out.write(vec_buf + 4, size - 4);
596
597 if (d_out.fail())
598 throw Error ("Network I/O Error. Could not send part of vector data");
599
600 // Now increment the element count so we can figure out about the padding in put_vector_last()
601 d_partial_put_byte_count += (size - 4);
602
603 xdr_destroy(&vec_sink);
604 delete [] vec_buf;
605#endif
606 }
607 catch (...) {
608 xdr_destroy(&vec_sink);
609 delete [] vec_buf;
610 throw;
611 }
612 }
613}
614
615void XDRStreamMarshaller::dump(ostream &strm) const
616{
617 strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
618}
619
620} // namespace libdap
621
A class for error processing.
Definition: Error.h:91
A class for software fault reporting.
Definition: InternalErr.h:65
static void * write_thread(void *arg)
static void * write_thread_part(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
Holds a one-dimensional collection of DAP2 data types.
Definition: Vector.h:81
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_vector_start(int num)
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition: XDRUtils.cc:145
Type
Identifies the data type.
Definition: Type.h:94