libdap  Updated for version 3.18.1
XDRStreamMarshaller.cc
1 // XDRStreamMarshaller.cc
2 
3 // -*- mode: c++; c-basic-offset:4 -*-
4 
5 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
6 // Access Protocol.
7 
8 // Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
9 // Author: Patrick West <pwest@ucar.edu>
10 // James Gallagher <jgallagher@opendap.org>
11 //
12 // This library is free software; you can redistribute it and/or
13 // modify it under the terms of the GNU Lesser General Public
14 // License as published by the Free Software Foundation; either
15 // version 2.1 of the License, or (at your option) any later version.
16 //
17 // This library is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 // Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public
23 // License along with this library; if not, write to the Free Software
24 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25 //
26 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
27 
28 // (c) COPYRIGHT URI/MIT 1994-1999
29 // Please read the full copyright statement in the file COPYRIGHT_URI.
30 //
31 // Authors:
32 // pwest Patrick West <pwest@ucar.edu>
33 
34 
35 #include "config.h"
36 
37 #ifdef HAVE_PTHREAD_H
38 #include <pthread.h>
39 #endif
40 
41 #include <cassert>
42 
43 #include <iostream>
44 #include <sstream>
45 #include <iomanip>
46 
47 // #define DODS_DEBUG
48 
49 #include "XDRStreamMarshaller.h"
50 #ifdef USE_POSIX_THREADS
51 #include "MarshallerThread.h"
52 #endif
53 #include "Vector.h"
54 #include "XDRUtils.h"
55 #include "util.h"
56 
57 #include "debug.h"
58 
59 using namespace std;
60 
61 // Build this code so it does not use pthreads to write some kinds of
62 // data (see the put_vector() and put_vector_part() methods) in a child thread.
63 // #undef USE_POSIX_THREADS
64 
65 namespace libdap {
66 
67 char *XDRStreamMarshaller::d_buf = 0;
68 static const int XDR_DAP_BUFF_SIZE=256;
69 
70 
79 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
80  d_out(out), d_partial_put_byte_count(0), tm(0)
81 {
82  if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
83  if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
84 
85  xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
86 
87 #ifdef USE_POSIX_THREADS
88  tm = new MarshallerThread;
89 #endif
90 }
91 
92 XDRStreamMarshaller::~XDRStreamMarshaller()
93 {
94  delete tm;
95 
96  xdr_destroy(&d_sink);
97 }
98 
99 void XDRStreamMarshaller::put_byte(dods_byte val)
100 {
101  if (!xdr_setpos(&d_sink, 0))
102  throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
103 
104  if (!xdr_char(&d_sink, (char *) &val))
105  throw Error(
106  "Network I/O Error. Could not send byte data.");
107 
108  unsigned int bytes_written = xdr_getpos(&d_sink);
109  if (!bytes_written)
110  throw Error(
111  "Network I/O Error. Could not send byte data - unable to get stream position.");
112 
113 #ifdef USE_POSIX_THREADS
114  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
115 #endif
116 
117  d_out.write(d_buf, bytes_written);
118 }
119 
120 void XDRStreamMarshaller::put_int16(dods_int16 val)
121 {
122  if (!xdr_setpos(&d_sink, 0))
123  throw Error(
124  "Network I/O Error. Could not send int 16 data - unable to set stream position.");
125 
126  if (!XDR_INT16(&d_sink, &val))
127  throw Error(
128  "Network I/O Error. Could not send int 16 data.");
129 
130  unsigned int bytes_written = xdr_getpos(&d_sink);
131  if (!bytes_written)
132  throw Error(
133  "Network I/O Error. Could not send int 16 data - unable to get stream position.");
134 
135 #ifdef USE_POSIX_THREADS
136  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
137 #endif
138 
139  d_out.write(d_buf, bytes_written);
140 }
141 
142 void XDRStreamMarshaller::put_int32(dods_int32 val)
143 {
144  if (!xdr_setpos(&d_sink, 0))
145  throw Error(
146  "Network I/O Error. Could not send int 32 data - unable to set stream position.");
147 
148  if (!XDR_INT32(&d_sink, &val))
149  throw Error(
150  "Network I/O Error. Culd not read int 32 data.");
151 
152  unsigned int bytes_written = xdr_getpos(&d_sink);
153  if (!bytes_written)
154  throw Error(
155  "Network I/O Error. Could not send int 32 data - unable to get stream position.");
156 
157 #ifdef USE_POSIX_THREADS
158  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
159 #endif
160 
161  d_out.write(d_buf, bytes_written);
162 }
163 
164 void XDRStreamMarshaller::put_float32(dods_float32 val)
165 {
166  if (!xdr_setpos(&d_sink, 0))
167  throw Error(
168  "Network I/O Error. Could not send float 32 data - unable to set stream position.");
169 
170  if (!xdr_float(&d_sink, &val))
171  throw Error(
172  "Network I/O Error. Could not send float 32 data.");
173 
174  unsigned int bytes_written = xdr_getpos(&d_sink);
175  if (!bytes_written)
176  throw Error(
177  "Network I/O Error. Could not send float 32 data - unable to get stream position.");
178 
179 #ifdef USE_POSIX_THREADS
180  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
181 #endif
182 
183  d_out.write(d_buf, bytes_written);
184 }
185 
186 void XDRStreamMarshaller::put_float64(dods_float64 val)
187 {
188  if (!xdr_setpos(&d_sink, 0))
189  throw Error(
190  "Network I/O Error. Could not send float 64 data - unable to set stream position.");
191 
192  if (!xdr_double(&d_sink, &val))
193  throw Error(
194  "Network I/O Error. Could not send float 64 data.");
195 
196  unsigned int bytes_written = xdr_getpos(&d_sink);
197  if (!bytes_written)
198  throw Error(
199  "Network I/O Error. Could not send float 64 data - unable to get stream position.");
200 
201 #ifdef USE_POSIX_THREADS
202  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
203 #endif
204 
205  d_out.write(d_buf, bytes_written);
206 }
207 
208 void XDRStreamMarshaller::put_uint16(dods_uint16 val)
209 {
210  if (!xdr_setpos(&d_sink, 0))
211  throw Error(
212  "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
213 
214  if (!XDR_UINT16(&d_sink, &val))
215  throw Error(
216  "Network I/O Error. Could not send uint 16 data.");
217 
218  unsigned int bytes_written = xdr_getpos(&d_sink);
219  if (!bytes_written)
220  throw Error(
221  "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
222 
223 #ifdef USE_POSIX_THREADS
224  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
225 #endif
226 
227  d_out.write(d_buf, bytes_written);
228 }
229 
230 void XDRStreamMarshaller::put_uint32(dods_uint32 val)
231 {
232  if (!xdr_setpos(&d_sink, 0))
233  throw Error(
234  "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
235 
236  if (!XDR_UINT32(&d_sink, &val))
237  throw Error(
238  "Network I/O Error. Could not send uint 32 data.");
239 
240  unsigned int bytes_written = xdr_getpos(&d_sink);
241  if (!bytes_written)
242  throw Error(
243  "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
244 
245 #ifdef USE_POSIX_THREADS
246  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
247 #endif
248 
249  d_out.write(d_buf, bytes_written);
250 }
251 
252 void XDRStreamMarshaller::put_str(const string &val)
253 {
254  int size = val.length() + 8;
255 
256  XDR str_sink;
257  vector<char> str_buf(size);
258 
259  try {
260  xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
261 
262  if (!xdr_setpos(&str_sink, 0))
263  throw Error(
264  "Network I/O Error. Could not send string data - unable to set stream position.");
265 
266  const char *out_tmp = val.c_str();
267  if (!xdr_string(&str_sink, (char **) &out_tmp, size))
268  throw Error(
269  "Network I/O Error. Could not send string data.");
270 
271  unsigned int bytes_written = xdr_getpos(&str_sink);
272  if (!bytes_written)
273  throw Error(
274  "Network I/O Error. Could not send string data - unable to get stream position.");
275 
276 #ifdef USE_POSIX_THREADS
277  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
278 #endif
279 
280  d_out.write(&str_buf[0], bytes_written);
281 
282  xdr_destroy(&str_sink);
283  }
284  catch (...) {
285  xdr_destroy(&str_sink);
286  throw;
287  }
288 }
289 
290 void XDRStreamMarshaller::put_url(const string &val)
291 {
292  put_str(val);
293 }
294 
295 void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
296 {
297  if (len > XDR_DAP_BUFF_SIZE)
298  throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
299 
300  if (!xdr_setpos(&d_sink, 0))
301  throw Error(
302  "Network I/O Error. Could not send opaque data - unable to set stream position.");
303 
304  if (!xdr_opaque(&d_sink, val, len))
305  throw Error(
306  "Network I/O Error. Could not send opaque data.");
307 
308  unsigned int bytes_written = xdr_getpos(&d_sink);
309  if (!bytes_written)
310  throw Error(
311  "Network I/O Error. Could not send opaque data - unable to get stream position.");
312 
313 #ifdef USE_POSIX_THREADS
314  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
315 #endif
316 
317  d_out.write(d_buf, bytes_written);
318 }
319 
320 void XDRStreamMarshaller::put_int(int val)
321 {
322  if (!xdr_setpos(&d_sink, 0))
323  throw Error(
324  "Network I/O Error. Could not send int data - unable to set stream position.");
325 
326  if (!xdr_int(&d_sink, &val))
327  throw Error(
328  "Network I/O Error(1). Could not send int data.");
329 
330  unsigned int bytes_written = xdr_getpos(&d_sink);
331  if (!bytes_written)
332  throw Error(
333  "Network I/O Error. Could not send int data - unable to get stream position.");
334 
335 #ifdef USE_POSIX_THREADS
336  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
337 #endif
338 
339  d_out.write(d_buf, bytes_written);
340 }
341 
342 void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
343 {
344  put_vector(val, num, width, vec.var()->type());
345 }
346 
347 
356 {
357  put_int(num);
358  put_int(num);
359 
360  d_partial_put_byte_count = 0;
361 }
362 
370 {
371 #ifdef USE_POSIX_THREADS
372  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
373 #endif
374 
375  // Compute the trailing (padding) bytes
376 
377  // Note that the XDR standard pads values to 4 byte boundaries.
378  //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
379  unsigned int mod_4 = d_partial_put_byte_count & 0x03;
380  unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
381 
382  if (pad) {
383  vector<char> padding(4, 0); // 4 zeros
384 
385  d_out.write(&padding[0], pad);
386  if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
387  }
388 }
389 
390 // Start of parallel I/O support. jhrg 8/19/15
391 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
392 {
393  if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
394 
395  // write the number of members of the array being written and then set the position to 0
396  put_int(num);
397 
398  // this is the word boundary for writing xdr bytes in a vector.
399  const unsigned int add_to = 8;
400  // switch to memory on the heap since the thread will need to access it
401  // after this code returns.
402  char *byte_buf = new char[num + add_to];
403  XDR byte_sink;
404  try {
405  xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
406  if (!xdr_setpos(&byte_sink, 0))
407  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
408 
409  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
410  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
411 
412  unsigned int bytes_written = xdr_getpos(&byte_sink);
413  if (!bytes_written)
414  throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
415 
416 #ifdef USE_POSIX_THREADS
417  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
418  tm->increment_child_thread_count();
419  tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
420  xdr_destroy(&byte_sink);
421 #else
422  d_out.write(byte_buf, bytes_written);
423  xdr_destroy(&byte_sink);
424  delete [] byte_buf;
425 #endif
426 
427  }
428  catch (...) {
429  DBG(cerr << "Caught an exception in put_vector_thread" << endl);
430  xdr_destroy(&byte_sink);
431  delete [] byte_buf;
432  throw;
433  }
434 }
435 
436 // private
447 void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
448 {
449  assert(val || num == 0);
450 
451  // write the number of array members being written, then set the position back to 0
452  put_int(num);
453 
454  if (num == 0)
455  return;
456 
457  int use_width = width;
458  if (use_width < 4) use_width = 4;
459 
460  // the size is the number of elements num times the width of each
461  // element, then add 4 bytes for the number of elements
462  int size = (num * use_width) + 4;
463 
464  // allocate enough memory for the elements
465  //vector<char> vec_buf(size);
466  char *vec_buf = new char[size];
467  XDR vec_sink;
468  try {
469  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
470 
471  // set the position of the sink to 0, we're starting at the beginning
472  if (!xdr_setpos(&vec_sink, 0))
473  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
474 
475  // write the array to the buffer
476  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
477  throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
478 
479  // how much was written to the buffer
480  unsigned int bytes_written = xdr_getpos(&vec_sink);
481  if (!bytes_written)
482  throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
483 
484 #ifdef USE_POSIX_THREADS
485  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
486  tm->increment_child_thread_count();
487  tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
488  xdr_destroy(&vec_sink);
489 #else
490  d_out.write(vec_buf, bytes_written);
491  xdr_destroy(&vec_sink);
492  delete [] vec_buf;
493 #endif
494  }
495  catch (...) {
496  xdr_destroy(&vec_sink);
497  delete [] vec_buf;
498  throw;
499  }
500 }
501 
513 void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
514 {
515  if (width == 1) {
516  // Add space for the 4 bytes of length info and 4 bytes for padding, even though
517  // we will not send either of those.
518  const unsigned int add_to = 8;
519  unsigned int bufsiz = num + add_to;
520  //vector<char> byte_buf(bufsiz);
521  char *byte_buf = new char[bufsiz];
522  XDR byte_sink;
523  try {
524  xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
525  if (!xdr_setpos(&byte_sink, 0))
526  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
527 
528  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
529  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
530 
531 #ifdef USE_POSIX_THREADS
532  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
533  tm->increment_child_thread_count();
534 
535  // Increment the element count so we can figure out about the padding in put_vector_last()
536  d_partial_put_byte_count += num;
537 
538  tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
539  xdr_destroy(&byte_sink);
540 #else
541  // Only send the num bytes that follow the 4 bytes of length info - we skip the
542  // length info because it's already been sent and we don't send any trailing padding
543  // bytes in this method (see put_vector_last() for that).
544  d_out.write(byte_buf + 4, num);
545 
546  if (d_out.fail())
547  throw Error ("Network I/O Error. Could not send initial part of byte vector data");
548 
549  // Now increment the element count so we can figure out about the padding in put_vector_last()
550  d_partial_put_byte_count += num;
551 
552  xdr_destroy(&byte_sink);
553  delete [] byte_buf;
554 #endif
555  }
556  catch (...) {
557  xdr_destroy(&byte_sink);
558  delete [] byte_buf;
559  throw;
560  }
561  }
562  else {
563  int use_width = (width < 4) ? 4 : width;
564 
565  // the size is the number of elements num times the width of each
566  // element, then add 4 bytes for the (int) number of elements
567  int size = (num * use_width) + 4;
568 
569  // allocate enough memory for the elements
570  //vector<char> vec_buf(size);
571  char *vec_buf = new char[size];
572  XDR vec_sink;
573  try {
574  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
575 
576  // set the position of the sink to 0, we're starting at the beginning
577  if (!xdr_setpos(&vec_sink, 0))
578  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
579 
580  // write the array to the buffer
581  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
582  throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
583 
584 #ifdef USE_POSIX_THREADS
585  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
586  tm->increment_child_thread_count();
587 
588  // Increment the element count so we can figure out about the padding in put_vector_last()
589  d_partial_put_byte_count += (size - 4);
590  tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
591  xdr_destroy(&vec_sink);
592 #else
593  // write that much out to the output stream, skipping the length data that
594  // XDR writes since we have already written the length info using put_vector_start()
595  d_out.write(vec_buf + 4, size - 4);
596 
597  if (d_out.fail())
598  throw Error ("Network I/O Error. Could not send part of vector data");
599 
600  // Now increment the element count so we can figure out about the padding in put_vector_last()
601  d_partial_put_byte_count += (size - 4);
602 
603  xdr_destroy(&vec_sink);
604  delete [] vec_buf;
605 #endif
606  }
607  catch (...) {
608  xdr_destroy(&vec_sink);
609  delete [] vec_buf;
610  throw;
611  }
612  }
613 }
614 
615 void XDRStreamMarshaller::dump(ostream &strm) const
616 {
617  strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
618 }
619 
620 } // namespace libdap
621 
static void * write_thread(void *arg)
Holds a one-dimensional collection of DAP2 data types.
Definition: Vector.h:80
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
virtual void put_vector_start(int num)
Type
Identifies the data type.
Definition: Type.h:94
A class for software fault reporting.
Definition: InternalErr.h:64
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
static void * write_thread_part(void *arg)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition: XDRUtils.cc:145
A class for error processing.
Definition: Error.h:90