Branch data Line data Source code
1 : : /** @file remoteconnection.cc
2 : : * @brief RemoteConnection class used by the remote backend.
3 : : */
4 : : /* Copyright (C) 2006,2007,2008,2009,2010,2011 Olly Betts
5 : : *
6 : : * This program is free software; you can redistribute it and/or modify
7 : : * it under the terms of the GNU General Public License as published by
8 : : * the Free Software Foundation; either version 2 of the License, or
9 : : * (at your option) any later version.
10 : : *
11 : : * This program is distributed in the hope that it will be useful,
12 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : : * GNU General Public License for more details.
15 : : *
16 : : * You should have received a copy of the GNU General Public License
17 : : * along with this program; if not, write to the Free Software
18 : : * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 : : */
20 : :
21 : : #include <config.h>
22 : :
23 : : #include "remoteconnection.h"
24 : :
25 : : #include <xapian/error.h>
26 : :
27 : : #include "safeerrno.h"
28 : : #include "safefcntl.h"
29 : : #include "safeunistd.h"
30 : :
31 : : #include <algorithm>
32 : : #include <string>
33 : :
34 : : #include "debuglog.h"
35 : : #include "omassert.h"
36 : : #include "realtime.h"
37 : : #include "serialise.h"
38 : : #include "socket_utils.h"
39 : : #include "utils.h"
40 : :
41 : : #ifndef __WIN32__
42 : : # include "safesysselect.h"
43 : : #else
44 : : # include "msvc_posix_wrapper.h"
45 : : #endif
46 : :
47 : : using namespace std;
48 : :
49 : : #define CHUNKSIZE 4096
50 : :
51 : : #ifdef __WIN32__
52 : : inline void
53 : : update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
54 : : {
55 : : STATIC_ASSERT_UNSIGNED_TYPE(DWORD); // signed overflow is undefined.
56 : : overlapped.Offset += n;
57 : : if (overlapped.Offset < n) ++overlapped.OffsetHigh;
58 : : }
59 : : #endif
60 : :
61 : 3338 : RemoteConnection::RemoteConnection(int fdin_, int fdout_,
62 : : const string & context_)
63 : 3338 : : fdin(fdin_), fdout(fdout_), context(context_)
64 : : {
65 : : #ifdef __WIN32__
66 : : memset(&overlapped, 0, sizeof(overlapped));
67 : : overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
68 : : if (!overlapped.hEvent)
69 : : throw Xapian::NetworkError("Failed to setup OVERLAPPED",
70 : : context, -(int)GetLastError());
71 : :
72 : : #endif
73 : 3338 : }
74 : :
75 : 3338 : RemoteConnection::~RemoteConnection()
76 : : {
77 : : #ifdef __WIN32__
78 : : if (overlapped.hEvent)
79 : : CloseHandle(overlapped.hEvent);
80 : : #endif
81 : 3338 : }
82 : :
83 : : void
84 : 13914056 : RemoteConnection::read_at_least(size_t min_len, double end_time)
85 : : {
86 : : LOGCALL_VOID(REMOTE, "RemoteConnection::read_at_least", min_len | end_time);
87 : :
88 [ + + ]: 13914056 : if (buffer.length() >= min_len) return;
89 : :
90 : : #ifdef __WIN32__
91 : : HANDLE hin = fd_to_handle(fdin);
92 : : do {
93 : : char buf[CHUNKSIZE];
94 : : DWORD received;
95 : : BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
96 : : if (!ok) {
97 : : int errcode = GetLastError();
98 : : if (errcode != ERROR_IO_PENDING)
99 : : throw Xapian::NetworkError("read failed", context, -errcode);
100 : : // Is asynch - just wait for the data to be received or a timeout.
101 : : DWORD waitrc;
102 : : waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
103 : : if (waitrc != WAIT_OBJECT_0) {
104 : : LOGLINE(REMOTE, "read: timeout has expired");
105 : : throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
106 : : }
107 : : // Get the final result of the read.
108 : : if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
109 : : throw Xapian::NetworkError("Failed to get overlapped result",
110 : : context, -(int)GetLastError());
111 : : }
112 : :
113 : : if (received == 0)
114 : : throw Xapian::NetworkError("Received EOF", context);
115 : :
116 : : buffer.append(buf, received);
117 : :
118 : : // We must update the offset in the OVERLAPPED structure manually.
119 : : update_overlapped_offset(overlapped, received);
120 : : } while (buffer.length() < min_len);
121 : : #else
122 : : // If there's no end_time, just use blocking I/O.
123 [ + + ][ - + ]: 8422673 : if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
124 : : throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
125 : 0 : context, errno);
126 : : }
127 : :
128 : 17267739 : while (true) {
129 : : char buf[CHUNKSIZE];
130 : 11777612 : ssize_t received = read(fdin, buf, sizeof(buf));
131 : :
132 [ + + ]: 11777612 : if (received > 0) {
133 : 8422067 : buffer.append(buf, received);
134 [ + + ]: 8422067 : if (buffer.length() >= min_len) return;
135 : 650 : continue;
136 : : }
137 : :
138 [ + + ]: 3355545 : if (received == 0)
139 : 1250 : throw Xapian::NetworkError("Received EOF", context);
140 : :
141 : : LOGLINE(REMOTE, "read gave errno = " << strerror(errno));
142 [ - + ]: 3354295 : if (errno == EINTR) continue;
143 : :
144 [ - + ]: 3354295 : if (errno != EAGAIN)
145 : 0 : throw Xapian::NetworkError("read failed", context, errno);
146 : :
147 : : Assert(end_time != 0.0);
148 : 31 : while (true) {
149 : : // Calculate how far in the future end_time is.
150 : 3354326 : double time_diff = end_time - RealTime::now();
151 : : // Check if the timeout has expired.
152 [ - + ]: 3354326 : if (time_diff < 0) {
153 : : LOGLINE(REMOTE, "read: timeout has expired");
154 : 0 : throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
155 : : }
156 : :
157 : : // Use select to wait until there is data or the timeout is reached.
158 : : fd_set fdset;
159 : 3354326 : FD_ZERO(&fdset);
160 : 3354326 : FD_SET(fdin, &fdset);
161 : :
162 : : struct timeval tv;
163 : 3354326 : tv.tv_sec = long(time_diff);
164 : 3354326 : tv.tv_usec = long(fmod(time_diff, 1.0) * 1000000);
165 : :
166 : 3354326 : int select_result = select(fdin + 1, &fdset, 0, &fdset, &tv);
167 [ + + ]: 3354326 : if (select_result > 0) break;
168 : :
169 [ + + ]: 37 : if (select_result == 0)
170 : 6 : throw Xapian::NetworkTimeoutError("Timeout expired while trying to read", context);
171 : :
172 : : // EINTR means select was interrupted by a signal.
173 [ - + ]: 31 : if (errno != EINTR)
174 : 0 : throw Xapian::NetworkError("select failed during read", context, errno);
175 : : }
176 : : }
177 : : #endif
178 : : }
179 : :
180 : : bool
181 : 4404 : RemoteConnection::ready_to_read() const
182 : : {
183 : : LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
184 [ - + ]: 4404 : if (fdin == -1) {
185 : 0 : throw Xapian::DatabaseError("Database has been closed");
186 : : }
187 : :
188 [ - + ]: 4404 : if (!buffer.empty()) RETURN(true);
189 : :
190 : : // Use select to see if there's data available to be read.
191 : : fd_set fdset;
192 : 4404 : FD_ZERO(&fdset);
193 : 4404 : FD_SET(fdin, &fdset);
194 : :
195 : : // Set a 0.1 second timeout to avoid a busy loop.
196 : : // FIXME: this would be much better done by exposing the fd so that the
197 : : // matcher can call select on all the fds involved...
198 : : struct timeval tv;
199 : 4404 : tv.tv_sec = 0;
200 : 4404 : tv.tv_usec = 100000;
201 : 4404 : RETURN(select(fdin + 1, &fdset, 0, &fdset, &tv) > 0);
202 : : }
203 : :
204 : : void
205 : 6643752 : RemoteConnection::send_message(char type, const string &message,
206 : : double end_time)
207 : : {
208 : : LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
209 [ + + ]: 6643752 : if (fdout == -1) {
210 : 72 : throw Xapian::DatabaseError("Database has been closed");
211 : : }
212 : :
213 : 6643680 : string header;
214 : 6643680 : header += type;
215 : 6643680 : header += encode_length(message.size());
216 : :
217 : : #ifdef __WIN32__
218 : : HANDLE hout = fd_to_handle(fdout);
219 : : const string * str = &header;
220 : :
221 : : size_t count = 0;
222 : : while (true) {
223 : : DWORD n;
224 : : BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
225 : : if (!ok) {
226 : : int errcode = GetLastError();
227 : : if (errcode != ERROR_IO_PENDING)
228 : : throw Xapian::NetworkError("write failed", context, -errcode);
229 : : // Just wait for the data to be received, or a timeout.
230 : : DWORD waitrc;
231 : : waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
232 : : if (waitrc != WAIT_OBJECT_0) {
233 : : LOGLINE(REMOTE, "write: timeout has expired");
234 : : throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
235 : : }
236 : : // Get the final result.
237 : : if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
238 : : throw Xapian::NetworkError("Failed to get overlapped result",
239 : : context, -(int)GetLastError());
240 : : }
241 : :
242 : : count += n;
243 : :
244 : : // We must update the offset in the OVERLAPPED structure manually.
245 : : update_overlapped_offset(overlapped, n);
246 : :
247 : : if (count == str->size()) {
248 : : if (str == &message || message.empty()) return;
249 : : str = &message;
250 : : count = 0;
251 : : }
252 : : }
253 : : #else
254 : : // If there's no end_time, just use blocking I/O.
255 [ + + ][ - + ]: 6643680 : if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
256 : : throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
257 : 0 : context, errno);
258 : : }
259 : :
260 : 6643680 : const string * str = &header;
261 : :
262 : : fd_set fdset;
263 : 6643680 : size_t count = 0;
264 : 6046387 : while (true) {
265 : : // We've set write to non-blocking, so just try writing as there
266 : : // will usually be space.
267 : 12690067 : ssize_t n = write(fdout, str->data() + count, str->size() - count);
268 : :
269 [ + + ]: 12690067 : if (n >= 0) {
270 : 12690060 : count += n;
271 [ + - ]: 12690060 : if (count == str->size()) {
272 [ + + ][ + + ]: 12690060 : if (str == &message || message.empty()) return;
[ + + ]
273 : 6046386 : str = &message;
274 : 6046386 : count = 0;
275 : : }
276 : 6046386 : continue;
277 : : }
278 : :
279 : : LOGLINE(REMOTE, "write gave errno = " << strerror(errno));
280 [ - + ]: 7 : if (errno == EINTR) continue;
281 : :
282 [ + + ]: 7 : if (errno != EAGAIN)
283 : 6 : throw Xapian::NetworkError("write failed", context, errno);
284 : :
285 : : // Use select to wait until there is space or the timeout is reached.
286 : 1 : FD_ZERO(&fdset);
287 : 1 : FD_SET(fdout, &fdset);
288 : :
289 : 1 : double time_diff = end_time - RealTime::now();
290 [ - + ]: 1 : if (time_diff < 0) {
291 : : LOGLINE(REMOTE, "write: timeout has expired");
292 : 0 : throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
293 : : }
294 : :
295 : : struct timeval tv;
296 : 1 : tv.tv_sec = long(time_diff);
297 : 1 : tv.tv_usec = long(fmod(time_diff, 1.0) * 1000000);
298 : :
299 : 1 : int select_result = select(fdout + 1, 0, &fdset, &fdset, &tv);
300 : :
301 [ - + ]: 1 : if (select_result < 0) {
302 [ # # ]: 0 : if (errno == EINTR) {
303 : : // EINTR means select was interrupted by a signal.
304 : : // We could just retry the select, but it's easier to just
305 : : // retry the write.
306 : 0 : continue;
307 : : }
308 : 0 : throw Xapian::NetworkError("select failed during write", context, errno);
309 : : }
310 : :
311 [ - + ]: 1 : if (select_result == 0)
312 : 0 : throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
313 : 6643680 : }
314 : : #endif
315 : : }
316 : :
317 : : void
318 : 328 : RemoteConnection::send_file(char type, int fd, double end_time)
319 : : {
320 : : LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
321 [ - + ]: 328 : if (fdout == -1) {
322 : 0 : throw Xapian::DatabaseError("Database has been closed");
323 : : }
324 : :
325 : : off_t size;
326 : : {
327 : : struct stat sb;
328 [ - + ]: 328 : if (fstat(fd, &sb) == -1)
329 : 0 : throw Xapian::NetworkError("Couldn't stat file to send", errno);
330 : 328 : size = sb.st_size;
331 : : }
332 : : // FIXME: Use sendfile() or similar if available?
333 : :
334 : : char buf[CHUNKSIZE];
335 : 328 : buf[0] = type;
336 : 328 : size_t c = 1;
337 : : {
338 : 328 : string enc_size = encode_length(size);
339 : 328 : c += enc_size.size();
340 : : // An encoded length should be just a few bytes.
341 : : AssertRel(c, <=, sizeof(buf));
342 : 328 : memcpy(buf + 1, enc_size.data(), enc_size.size());
343 : : }
344 : :
345 : : #ifdef __WIN32__
346 : : HANDLE hout = fd_to_handle(fdout);
347 : : size_t count = 0;
348 : : while (true) {
349 : : DWORD n;
350 : : BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
351 : : if (!ok) {
352 : : int errcode = GetLastError();
353 : : if (errcode != ERROR_IO_PENDING)
354 : : throw Xapian::NetworkError("write failed", context, -errcode);
355 : : // Just wait for the data to be received, or a timeout.
356 : : DWORD waitrc;
357 : : waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
358 : : if (waitrc != WAIT_OBJECT_0) {
359 : : LOGLINE(REMOTE, "write: timeout has expired");
360 : : throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
361 : : }
362 : : // Get the final result.
363 : : if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
364 : : throw Xapian::NetworkError("Failed to get overlapped result",
365 : : context, -(int)GetLastError());
366 : : }
367 : :
368 : : count += n;
369 : :
370 : : // We must update the offset in the OVERLAPPED structure manually.
371 : : update_overlapped_offset(overlapped, n);
372 : :
373 : : if (count == c) {
374 : : if (size == 0) return;
375 : :
376 : : ssize_t res;
377 : : do {
378 : : res = read(fd, buf, sizeof(buf));
379 : : } while (res < 0 && errno == EINTR);
380 : : if (res < 0) throw Xapian::NetworkError("read failed", errno);
381 : : c = size_t(res);
382 : :
383 : : size -= c;
384 : : count = 0;
385 : : }
386 : : }
387 : : #else
388 : : // If there's no end_time, just use blocking I/O.
389 [ - + ][ - + ]: 328 : if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
390 : : throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
391 : 0 : context, errno);
392 : : }
393 : :
394 : : fd_set fdset;
395 : 328 : size_t count = 0;
396 : 392 : while (true) {
397 : : // We've set write to non-blocking, so just try writing as there
398 : : // will usually be space.
399 : 720 : ssize_t n = write(fdout, buf + count, c - count);
400 : :
401 [ + - ]: 720 : if (n >= 0) {
402 : 720 : count += n;
403 [ + - ]: 720 : if (count == c) {
404 [ + + ]: 720 : if (size == 0) return;
405 : :
406 : : ssize_t res;
407 [ - + # # ]: 392 : do {
[ - + ]
408 : 392 : res = read(fd, buf, sizeof(buf));
409 : : } while (res < 0 && errno == EINTR);
410 [ - + ]: 392 : if (res < 0) throw Xapian::NetworkError("read failed", errno);
411 : 392 : c = size_t(res);
412 : :
413 : 392 : size -= c;
414 : 392 : count = 0;
415 : : }
416 : 392 : continue;
417 : : }
418 : :
419 : : LOGLINE(REMOTE, "write gave errno = " << strerror(errno));
420 [ # # ]: 0 : if (errno == EINTR) continue;
421 : :
422 [ # # ]: 0 : if (errno != EAGAIN)
423 : 0 : throw Xapian::NetworkError("write failed", context, errno);
424 : :
425 : : // Use select to wait until there is space or the timeout is reached.
426 : 0 : FD_ZERO(&fdset);
427 : 0 : FD_SET(fdout, &fdset);
428 : :
429 : 0 : double time_diff = end_time - RealTime::now();
430 [ # # ]: 0 : if (time_diff < 0) {
431 : : LOGLINE(REMOTE, "write: timeout has expired");
432 : 0 : throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
433 : : }
434 : :
435 : : struct timeval tv;
436 : 0 : tv.tv_sec = long(time_diff);
437 : 0 : tv.tv_usec = long(fmod(time_diff, 1.0) * 1000000);
438 : :
439 : 0 : int select_result = select(fdout + 1, 0, &fdset, &fdset, &tv);
440 : :
441 [ # # ]: 0 : if (select_result < 0) {
442 [ # # ]: 0 : if (errno == EINTR) {
443 : : // EINTR means select was interrupted by a signal.
444 : : // We could just retry the select, but it's easier to just
445 : : // retry the write.
446 : 0 : continue;
447 : : }
448 : 0 : throw Xapian::NetworkError("select failed during write", context, errno);
449 : : }
450 : :
451 [ # # ]: 0 : if (select_result == 0)
452 : 0 : throw Xapian::NetworkTimeoutError("Timeout expired while trying to write", context);
453 : : }
454 : : #endif
455 : : }
456 : :
457 : : char
458 : 836 : RemoteConnection::sniff_next_message_type(double end_time)
459 : : {
460 : : LOGCALL(REMOTE, char, "RemoteConnection::sniff_next_message_type", end_time);
461 [ - + ]: 836 : if (fdin == -1) {
462 : 0 : throw Xapian::DatabaseError("Database has been closed");
463 : : }
464 : :
465 : 836 : read_at_least(1, end_time);
466 : 833 : char type = buffer[0];
467 : 833 : RETURN(type);
468 : : }
469 : :
470 : : char
471 : 6644805 : RemoteConnection::get_message(string &result, double end_time)
472 : : {
473 : : LOGCALL(REMOTE, char, "RemoteConnection::get_message", result | end_time);
474 [ - + ]: 6644805 : if (fdin == -1) {
475 : 0 : throw Xapian::DatabaseError("Database has been closed");
476 : : }
477 : :
478 : 6644805 : read_at_least(2, end_time);
479 : 6643665 : size_t len = static_cast<unsigned char>(buffer[1]);
480 : 6643665 : read_at_least(len + 2, end_time);
481 [ + + ]: 6643665 : if (len != 0xff) {
482 : 6020347 : result.assign(buffer.data() + 2, len);
483 : 6020347 : char type = buffer[0];
484 : 6020347 : buffer.erase(0, len + 2);
485 : 6020347 : RETURN(type);
486 : : }
487 : 623318 : len = 0;
488 : 623318 : string::const_iterator i = buffer.begin() + 2;
489 : : unsigned char ch;
490 : 623318 : int shift = 0;
491 [ + + ]: 1103696 : do {
492 [ + - ][ - + ]: 1103696 : if (i == buffer.end() || shift > 28) {
[ - + ]
493 : : // Something is very wrong...
494 : 0 : throw Xapian::NetworkError("Insane message length specified!");
495 : : }
496 : 1103696 : ch = *i++;
497 : 1103696 : len |= size_t(ch & 0x7f) << shift;
498 : 1103696 : shift += 7;
499 : : } while ((ch & 0x80) == 0);
500 : 623318 : len += 255;
501 : 623318 : size_t header_len = (i - buffer.begin());
502 : 623318 : read_at_least(header_len + len, end_time);
503 : 623318 : result.assign(buffer.data() + header_len, len);
504 : 623318 : char type = buffer[0];
505 : 623318 : buffer.erase(0, header_len + len);
506 : 6643665 : RETURN(type);
507 : : }
508 : :
509 : : char
510 : 151 : RemoteConnection::get_message_chunked(double end_time)
511 : : {
512 : : LOGCALL(REMOTE, char, "RemoteConnection::get_message_chunked", end_time);
513 [ - + ]: 151 : if (fdin == -1) {
514 : 0 : throw Xapian::DatabaseError("Database has been closed");
515 : : }
516 : :
517 : 151 : read_at_least(2, end_time);
518 : 151 : off_t len = static_cast<unsigned char>(buffer[1]);
519 [ - + ]: 151 : if (len != 0xff) {
520 : 0 : chunked_data_left = len;
521 : 0 : char type = buffer[0];
522 : 0 : buffer.erase(0, 2);
523 : 0 : RETURN(type);
524 : : }
525 : 151 : read_at_least(len + 2, end_time);
526 : 88 : len = 0;
527 : 88 : string::const_iterator i = buffer.begin() + 2;
528 : : unsigned char ch;
529 : 88 : int shift = 0;
530 [ + + ]: 176 : do {
531 : : // Allow a full 64 bits for message lengths - anything longer than that
532 : : // is almost certainly a corrupt value.
533 [ + - ][ - + ]: 176 : if (i == buffer.end() || shift > 63) {
[ - + ]
534 : : // Something is very wrong...
535 : 0 : throw Xapian::NetworkError("Insane message length specified!");
536 : : }
537 : 176 : ch = *i++;
538 : 176 : len |= off_t(ch & 0x7f) << shift;
539 : 176 : shift += 7;
540 : : } while ((ch & 0x80) == 0);
541 : 88 : len += 255;
542 : 88 : chunked_data_left = len;
543 : 88 : char type = buffer[0];
544 : 88 : size_t header_len = (i - buffer.begin());
545 : 88 : buffer.erase(0, header_len);
546 : 88 : RETURN(type);
547 : : }
548 : :
549 : : bool
550 : 1655 : RemoteConnection::get_message_chunk(string &result, size_t at_least,
551 : : double end_time)
552 : : {
553 : : LOGCALL(REMOTE, bool, "RemoteConnection::get_message_chunk", result | at_least | end_time);
554 [ - + ]: 1655 : if (fdin == -1) {
555 : 0 : throw Xapian::DatabaseError("Database has been closed");
556 : : }
557 : :
558 [ + + ]: 1655 : if (at_least <= result.size()) RETURN(true);
559 : 491 : at_least -= result.size();
560 : :
561 : 491 : bool read_enough = (off_t(at_least) <= chunked_data_left);
562 [ + + ]: 491 : if (!read_enough) at_least = size_t(chunked_data_left);
563 : :
564 : 491 : read_at_least(at_least, end_time);
565 : :
566 : 441 : size_t retlen(min(off_t(buffer.size()), chunked_data_left));
567 : 441 : result.append(buffer, 0, retlen);
568 : 441 : buffer.erase(0, retlen);
569 : 441 : chunked_data_left -= retlen;
570 : :
571 : 1605 : RETURN(read_enough);
572 : : }
573 : :
574 : : /** Write n bytes from block pointed to by p to file descriptor fd. */
575 : : static void
576 : 346 : write_all(int fd, const char * p, size_t n)
577 : : {
578 [ + + ]: 692 : while (n) {
579 : 346 : ssize_t c = write(fd, p, n);
580 [ - + ]: 346 : if (c < 0) {
581 [ # # ]: 0 : if (errno == EINTR) continue;
582 : 0 : throw Xapian::NetworkError("Error writing to file", errno);
583 : : }
584 : 346 : p += c;
585 : 346 : n -= c;
586 : : }
587 : 346 : }
588 : :
589 : : char
590 : 293 : RemoteConnection::receive_file(const string &file, double end_time)
591 : : {
592 : : LOGCALL(REMOTE, char, "RemoteConnection::receive_file", file | end_time);
593 [ - + ]: 293 : if (fdin == -1) {
594 : 0 : throw Xapian::DatabaseError("Database has been closed");
595 : : }
596 : :
597 : : #ifdef __WIN32__
598 : : // Do we want to be able to delete the file during writing?
599 : : int fd = msvc_posix_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC);
600 : : #else
601 : 293 : int fd = open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666);
602 : : #endif
603 [ - + ]: 293 : if (fd == -1) throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
604 : 293 : fdcloser closefd(fd);
605 : :
606 : 293 : read_at_least(2, end_time);
607 : 293 : size_t len = static_cast<unsigned char>(buffer[1]);
608 : 293 : read_at_least(len + 2, end_time);
609 [ + + ]: 293 : if (len != 0xff) {
610 : 205 : write_all(fd, buffer.data() + 2, len);
611 : 205 : char type = buffer[0];
612 : 205 : buffer.erase(0, len + 2);
613 : 205 : RETURN(type);
614 : : }
615 : 88 : len = 0;
616 : 88 : string::const_iterator i = buffer.begin() + 2;
617 : : unsigned char ch;
618 : 88 : int shift = 0;
619 [ + + ]: 176 : do {
620 [ + - ][ - + ]: 176 : if (i == buffer.end() || shift > 28) {
[ - + ]
621 : : // Something is very wrong...
622 : 0 : throw Xapian::NetworkError("Insane message length specified!");
623 : : }
624 : 176 : ch = *i++;
625 : 176 : len |= size_t(ch & 0x7f) << shift;
626 : 176 : shift += 7;
627 : : } while ((ch & 0x80) == 0);
628 : 88 : len += 255;
629 : 88 : size_t header_len = (i - buffer.begin());
630 : 88 : size_t remainlen(min(buffer.size() - header_len, len));
631 : 88 : write_all(fd, buffer.data() + header_len, remainlen);
632 : 88 : len -= remainlen;
633 : 88 : char type = buffer[0];
634 : 88 : buffer.erase(0, header_len + remainlen);
635 [ + + ]: 141 : while (len > 0) {
636 : 53 : read_at_least(min(len, size_t(CHUNKSIZE)), end_time);
637 : 53 : remainlen = min(buffer.size(), len);
638 : 53 : write_all(fd, buffer.data(), remainlen);
639 : 53 : len -= remainlen;
640 : 53 : buffer.erase(0, remainlen);
641 : : }
642 : 293 : RETURN(type);
643 : : }
644 : :
645 : : void
646 : 1578 : RemoteConnection::do_close(bool wait)
647 : : {
648 : : LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", wait);
649 : :
650 [ + + ]: 1578 : if (fdin >= 0) {
651 [ + + ]: 1554 : if (wait) {
652 : : // We can be called from a destructor, so we can't throw an
653 : : // exception.
654 : : try {
655 : 414 : send_message(MSG_SHUTDOWN, string(), 0.0);
656 : 0 : } catch (...) {
657 : : }
658 : : #ifdef __WIN32__
659 : : HANDLE hin = fd_to_handle(fdin);
660 : : char dummy;
661 : : DWORD received;
662 : : BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
663 : : if (!ok && GetLastError() == ERROR_IO_PENDING) {
664 : : // Wait for asynchronous read to complete.
665 : : (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
666 : : }
667 : : #else
668 : : // Wait for the connection to be closed - when this happens
669 : : // select() will report that a read won't block.
670 : : fd_set fdset;
671 : 414 : FD_ZERO(&fdset);
672 : 414 : FD_SET(fdin, &fdset);
673 : : int res;
674 [ - + # # ]: 414 : do {
[ - + ]
675 : 414 : res = select(fdin + 1, &fdset, 0, &fdset, NULL);
676 : : } while (res < 0 && errno == EINTR);
677 : : #endif
678 : : }
679 : 1554 : close_fd_or_socket(fdin);
680 : :
681 : : // If the same fd is used in both directions, don't close it twice.
682 [ + - ]: 1554 : if (fdin == fdout) fdout = -1;
683 : :
684 : 1554 : fdin = -1;
685 : : }
686 : :
687 [ - + ]: 1578 : if (fdout >= 0) {
688 : 0 : close_fd_or_socket(fdout);
689 : 0 : fdout = -1;
690 : : }
691 : 1578 : }
692 : :
693 : : #ifdef __WIN32__
694 : : DWORD
695 : : RemoteConnection::calc_read_wait_msecs(double end_time)
696 : : {
697 : : if (end_time == 0.0)
698 : : return INFINITE;
699 : :
700 : : // Calculate how far in the future end_time is.
701 : : double time_diff = end_time - RealTime::now();
702 : :
703 : : // DWORD is unsigned, so we mustn't try and return a negative value.
704 : : if (time_diff < 0.0) {
705 : : throw Xapian::NetworkTimeoutError("Timeout expired before starting read", context);
706 : : }
707 : : return static_cast<DWORD>(time_diff * 1000.0);
708 : : }
709 : : #endif
|