49#include "XDRStreamMarshaller.h"
50#ifdef USE_POSIX_THREADS
51#include "MarshallerThread.h"
67char *XDRStreamMarshaller::d_buf = 0;
68static const int XDR_DAP_BUFF_SIZE=256;
79XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
80 d_out(out), d_partial_put_byte_count(0), tm(0)
82 if (!d_buf) d_buf = (
char *) malloc(XDR_DAP_BUFF_SIZE);
83 if (!d_buf)
throw Error(internal_error,
"Failed to allocate memory for data serialization.");
85 xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
87#ifdef USE_POSIX_THREADS
92XDRStreamMarshaller::~XDRStreamMarshaller()
99void XDRStreamMarshaller::put_byte(dods_byte val)
101 if (!xdr_setpos(&d_sink, 0))
102 throw Error(
"Network I/O Error. Could not send byte data - unable to set stream position.");
104 if (!xdr_char(&d_sink, (
char *) &val))
106 "Network I/O Error. Could not send byte data.");
108 unsigned int bytes_written = xdr_getpos(&d_sink);
111 "Network I/O Error. Could not send byte data - unable to get stream position.");
113#ifdef USE_POSIX_THREADS
114 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
117 d_out.write(d_buf, bytes_written);
120void XDRStreamMarshaller::put_int16(dods_int16 val)
122 if (!xdr_setpos(&d_sink, 0))
124 "Network I/O Error. Could not send int 16 data - unable to set stream position.");
126 if (!XDR_INT16(&d_sink, &val))
128 "Network I/O Error. Could not send int 16 data.");
130 unsigned int bytes_written = xdr_getpos(&d_sink);
133 "Network I/O Error. Could not send int 16 data - unable to get stream position.");
135#ifdef USE_POSIX_THREADS
136 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
139 d_out.write(d_buf, bytes_written);
142void XDRStreamMarshaller::put_int32(dods_int32 val)
144 if (!xdr_setpos(&d_sink, 0))
146 "Network I/O Error. Could not send int 32 data - unable to set stream position.");
148 if (!XDR_INT32(&d_sink, &val))
150 "Network I/O Error. Culd not read int 32 data.");
152 unsigned int bytes_written = xdr_getpos(&d_sink);
155 "Network I/O Error. Could not send int 32 data - unable to get stream position.");
157#ifdef USE_POSIX_THREADS
158 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
161 d_out.write(d_buf, bytes_written);
164void XDRStreamMarshaller::put_float32(dods_float32 val)
166 if (!xdr_setpos(&d_sink, 0))
168 "Network I/O Error. Could not send float 32 data - unable to set stream position.");
170 if (!xdr_float(&d_sink, &val))
172 "Network I/O Error. Could not send float 32 data.");
174 unsigned int bytes_written = xdr_getpos(&d_sink);
177 "Network I/O Error. Could not send float 32 data - unable to get stream position.");
179#ifdef USE_POSIX_THREADS
180 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
183 d_out.write(d_buf, bytes_written);
186void XDRStreamMarshaller::put_float64(dods_float64 val)
188 if (!xdr_setpos(&d_sink, 0))
190 "Network I/O Error. Could not send float 64 data - unable to set stream position.");
192 if (!xdr_double(&d_sink, &val))
194 "Network I/O Error. Could not send float 64 data.");
196 unsigned int bytes_written = xdr_getpos(&d_sink);
199 "Network I/O Error. Could not send float 64 data - unable to get stream position.");
201#ifdef USE_POSIX_THREADS
202 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
205 d_out.write(d_buf, bytes_written);
208void XDRStreamMarshaller::put_uint16(dods_uint16 val)
210 if (!xdr_setpos(&d_sink, 0))
212 "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
214 if (!XDR_UINT16(&d_sink, &val))
216 "Network I/O Error. Could not send uint 16 data.");
218 unsigned int bytes_written = xdr_getpos(&d_sink);
221 "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
223#ifdef USE_POSIX_THREADS
224 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
227 d_out.write(d_buf, bytes_written);
230void XDRStreamMarshaller::put_uint32(dods_uint32 val)
232 if (!xdr_setpos(&d_sink, 0))
234 "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
236 if (!XDR_UINT32(&d_sink, &val))
238 "Network I/O Error. Could not send uint 32 data.");
240 unsigned int bytes_written = xdr_getpos(&d_sink);
243 "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
245#ifdef USE_POSIX_THREADS
246 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
249 d_out.write(d_buf, bytes_written);
252void XDRStreamMarshaller::put_str(
const string &val)
254 int size = val.length() + 8;
257 vector<char> str_buf(size);
260 xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
262 if (!xdr_setpos(&str_sink, 0))
264 "Network I/O Error. Could not send string data - unable to set stream position.");
266 const char *out_tmp = val.c_str();
267 if (!xdr_string(&str_sink, (
char **) &out_tmp, size))
269 "Network I/O Error. Could not send string data.");
271 unsigned int bytes_written = xdr_getpos(&str_sink);
274 "Network I/O Error. Could not send string data - unable to get stream position.");
276#ifdef USE_POSIX_THREADS
277 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
280 d_out.write(&str_buf[0], bytes_written);
282 xdr_destroy(&str_sink);
285 xdr_destroy(&str_sink);
290void XDRStreamMarshaller::put_url(
const string &val)
295void XDRStreamMarshaller::put_opaque(
char *val,
unsigned int len)
297 if (len > XDR_DAP_BUFF_SIZE)
298 throw Error(
"Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
300 if (!xdr_setpos(&d_sink, 0))
302 "Network I/O Error. Could not send opaque data - unable to set stream position.");
304 if (!xdr_opaque(&d_sink, val, len))
306 "Network I/O Error. Could not send opaque data.");
308 unsigned int bytes_written = xdr_getpos(&d_sink);
311 "Network I/O Error. Could not send opaque data - unable to get stream position.");
313#ifdef USE_POSIX_THREADS
314 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
317 d_out.write(d_buf, bytes_written);
320void XDRStreamMarshaller::put_int(
int val)
322 if (!xdr_setpos(&d_sink, 0))
324 "Network I/O Error. Could not send int data - unable to set stream position.");
326 if (!xdr_int(&d_sink, &val))
328 "Network I/O Error(1). Could not send int data.");
330 unsigned int bytes_written = xdr_getpos(&d_sink);
333 "Network I/O Error. Could not send int data - unable to get stream position.");
335#ifdef USE_POSIX_THREADS
336 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
339 d_out.write(d_buf, bytes_written);
342void XDRStreamMarshaller::put_vector(
char *val,
int num,
int width, Vector &vec)
344 put_vector(val, num, width, vec.var()->type());
360 d_partial_put_byte_count = 0;
371#ifdef USE_POSIX_THREADS
372 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
379 unsigned int mod_4 = d_partial_put_byte_count & 0x03;
380 unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
383 vector<char> padding(4, 0);
385 d_out.write(&padding[0], pad);
386 if (d_out.fail())
throw Error(
"Network I/O Error. Could not send vector data padding");
391void XDRStreamMarshaller::put_vector(
char *val,
int num,
Vector &)
393 if (!val)
throw InternalErr(__FILE__, __LINE__,
"Could not send byte vector data. Buffer pointer is not set.");
399 const unsigned int add_to = 8;
402 char *byte_buf =
new char[num + add_to];
405 xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
406 if (!xdr_setpos(&byte_sink, 0))
407 throw Error(
"Network I/O Error. Could not send byte vector data - unable to set stream position.");
409 if (!xdr_bytes(&byte_sink, (
char **) &val, (
unsigned int *) &num, num + add_to))
410 throw Error(
"Network I/O Error(2). Could not send byte vector data - unable to encode data.");
412 unsigned int bytes_written = xdr_getpos(&byte_sink);
414 throw Error(
"Network I/O Error. Could not send byte vector data - unable to get stream position.");
416#ifdef USE_POSIX_THREADS
417 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
418 tm->increment_child_thread_count();
420 xdr_destroy(&byte_sink);
422 d_out.write(byte_buf, bytes_written);
423 xdr_destroy(&byte_sink);
429 DBG(cerr <<
"Caught an exception in put_vector_thread" << endl);
430 xdr_destroy(&byte_sink);
447void XDRStreamMarshaller::put_vector(
char *val,
unsigned int num,
int width,
Type type)
449 assert(val || num == 0);
457 int use_width = width;
458 if (use_width < 4) use_width = 4;
462 int size = (num * use_width) + 4;
466 char *vec_buf =
new char[size];
469 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
472 if (!xdr_setpos(&vec_sink, 0))
473 throw Error(
"Network I/O Error. Could not send vector data - unable to set stream position.");
476 if (!xdr_array(&vec_sink, (
char **) &val, (
unsigned int *) &num, size, width,
XDRUtils::xdr_coder(type)))
477 throw Error(
"Network I/O Error(2). Could not send vector data - unable to encode.");
480 unsigned int bytes_written = xdr_getpos(&vec_sink);
482 throw Error(
"Network I/O Error. Could not send vector data - unable to get stream position.");
484#ifdef USE_POSIX_THREADS
485 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
486 tm->increment_child_thread_count();
488 xdr_destroy(&vec_sink);
490 d_out.write(vec_buf, bytes_written);
491 xdr_destroy(&vec_sink);
496 xdr_destroy(&vec_sink);
518 const unsigned int add_to = 8;
519 unsigned int bufsiz = num + add_to;
521 char *byte_buf =
new char[bufsiz];
524 xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
525 if (!xdr_setpos(&byte_sink, 0))
526 throw Error(
"Network I/O Error. Could not send byte vector data - unable to set stream position.");
528 if (!xdr_bytes(&byte_sink, (
char **) &val, (
unsigned int *) &num, bufsiz))
529 throw Error(
"Network I/O Error(2). Could not send byte vector data - unable to encode data.");
531#ifdef USE_POSIX_THREADS
532 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
533 tm->increment_child_thread_count();
536 d_partial_put_byte_count += num;
539 xdr_destroy(&byte_sink);
544 d_out.write(byte_buf + 4, num);
547 throw Error (
"Network I/O Error. Could not send initial part of byte vector data");
550 d_partial_put_byte_count += num;
552 xdr_destroy(&byte_sink);
557 xdr_destroy(&byte_sink);
563 int use_width = (width < 4) ? 4 : width;
567 int size = (num * use_width) + 4;
571 char *vec_buf =
new char[size];
574 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
577 if (!xdr_setpos(&vec_sink, 0))
578 throw Error(
"Network I/O Error. Could not send vector data - unable to set stream position.");
581 if (!xdr_array(&vec_sink, (
char **) &val, (
unsigned int *) &num, size, width,
XDRUtils::xdr_coder(type)))
582 throw Error(
"Network I/O Error(2). Could not send vector data -unable to encode data.");
584#ifdef USE_POSIX_THREADS
585 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
586 tm->increment_child_thread_count();
589 d_partial_put_byte_count += (size - 4);
591 xdr_destroy(&vec_sink);
595 d_out.write(vec_buf + 4, size - 4);
598 throw Error (
"Network I/O Error. Could not send part of vector data");
601 d_partial_put_byte_count += (size - 4);
603 xdr_destroy(&vec_sink);
608 xdr_destroy(&vec_sink);
617 strm << DapIndent::LMarg <<
"XDRStreamMarshaller::dump - (" << (
void *)
this <<
")" << endl;
A class for error processing.
A class for software fault reporting.
static void * write_thread(void *arg)
static void * write_thread_part(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
Holds a one-dimensional collection of DAP2 data types.
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_vector_end()
virtual void put_vector_start(int num)
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Type
Identifies the data type.