Branch data Line data Source code
1 : : /** @file flint_databasereplicator.cc
2 : : * @brief Support for flint database replication
3 : : */
4 : : /* Copyright 2008 Lemur Consulting Ltd
5 : : * Copyright 2009,2010 Olly Betts
6 : : * Copyright 2010 Richard Boulton
7 : : *
8 : : * This program is free software; you can redistribute it and/or
9 : : * modify it under the terms of the GNU General Public License as
10 : : * published by the Free Software Foundation; either version 2 of the
11 : : * License, or (at your option) any later version.
12 : : *
13 : : * This program is distributed in the hope that it will be useful,
14 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 : : * GNU General Public License for more details.
17 : : *
18 : : * You should have received a copy of the GNU General Public License
19 : : * along with this program; if not, write to the Free Software
20 : : * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
21 : : * USA
22 : : */
23 : :
24 : : #include <config.h>
25 : :
26 : : #include "flint_databasereplicator.h"
27 : :
28 : : #include "xapian/error.h"
29 : :
30 : : #include "../flint_lock.h"
31 : : #include "flint_record.h"
32 : : #include "flint_replicate_internal.h"
33 : : #include "flint_types.h"
34 : : #include "flint_utils.h"
35 : : #include "flint_version.h"
36 : : #include "debuglog.h"
37 : : #include "io_utils.h"
38 : : #include "remoteconnection.h"
39 : : #include "replicate_utils.h"
40 : : #include "replicationprotocol.h"
41 : : #include "safeerrno.h"
42 : : #include "str.h"
43 : : #include "stringutils.h"
44 : : #include "utils.h"
45 : :
46 : : #ifdef __WIN32__
47 : : # include "msvc_posix_wrapper.h"
48 : : #endif
49 : :
50 : : #include <cstdio> // For rename().
51 : :
52 : : using namespace std;
53 : : using namespace Xapian;
54 : :
55 : 68 : FlintDatabaseReplicator::FlintDatabaseReplicator(const string & db_dir_)
56 : : : db_dir(db_dir_),
57 : 68 : max_changesets(0)
58 : : {
59 : 68 : const char *p = getenv("XAPIAN_MAX_CHANGESETS");
60 [ + - # # ]: 68 : if (p)
61 : 68 : max_changesets = atoi(p);
62 : 68 : }
63 : :
64 : : bool
65 : 8 : FlintDatabaseReplicator::check_revision_at_least(const string & rev,
66 : : const string & target) const
67 : : {
68 : : LOGCALL(DB, bool, "FlintDatabaseReplicator::check_revision_at_least", rev | target);
69 : :
70 : : flint_revision_number_t rev_val;
71 : : flint_revision_number_t target_val;
72 : :
73 : 8 : const char * ptr = rev.data();
74 : 8 : const char * end = ptr + rev.size();
75 [ - + ]: 8 : if (!F_unpack_uint(&ptr, end, &rev_val)) {
76 : 0 : throw NetworkError("Invalid revision string supplied to check_revision_at_least");
77 : : }
78 : :
79 : 8 : ptr = target.data();
80 : 8 : end = ptr + target.size();
81 [ - + ]: 8 : if (!F_unpack_uint(&ptr, end, &target_val)) {
82 : 0 : throw NetworkError("Invalid revision string supplied to check_revision_at_least");
83 : : }
84 : :
85 : 8 : RETURN(rev_val >= target_val);
86 : : }
87 : :
88 : : void
89 : 60 : FlintDatabaseReplicator::process_changeset_chunk_base(const string & tablename,
90 : : string & buf,
91 : : RemoteConnection & conn,
92 : : double end_time,
93 : : int changes_fd) const
94 : : {
95 : 60 : const char *ptr = buf.data();
96 : 60 : const char *end = ptr + buf.size();
97 : :
98 : : // Get the letter
99 : 60 : char letter = ptr[0];
100 [ + + - + ]: 60 : if (letter != 'A' && letter != 'B')
101 : 0 : throw NetworkError("Invalid base file letter in changeset");
102 : 60 : ++ptr;
103 : :
104 : :
105 : : // Get the base size
106 [ - + ]: 60 : if (ptr == end)
107 : 0 : throw NetworkError("Unexpected end of changeset (5)");
108 : : string::size_type base_size;
109 [ - + ]: 60 : if (!F_unpack_uint(&ptr, end, &base_size))
110 : 0 : throw NetworkError("Invalid base file size in changeset");
111 : :
112 : : // Get the new base file into buf.
113 : 60 : write_and_clear_changes(changes_fd, buf, ptr - buf.data());
114 : 60 : conn.get_message_chunk(buf, base_size, end_time);
115 : :
116 [ - + ]: 60 : if (buf.size() < base_size)
117 : 0 : throw NetworkError("Unexpected end of changeset (6)");
118 : :
119 : : // Write base_size bytes from start of buf to base file for tablename
120 : 60 : string tmp_path = db_dir + "/" + tablename + "tmp";
121 : 60 : string base_path = db_dir + "/" + tablename + ".base" + letter;
122 : : #ifdef __WIN32__
123 : : int fd = msvc_posix_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
124 : : #else
125 : 60 : int fd = ::open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
126 : : #endif
127 [ - + ]: 60 : if (fd == -1) {
128 : 0 : string msg = "Failed to open ";
129 : 0 : msg += tmp_path;
130 : 0 : throw DatabaseError(msg, errno);
131 : : }
132 : : {
133 : 60 : fdcloser closer(fd);
134 : :
135 : 60 : io_write(fd, buf.data(), base_size);
136 : 60 : io_sync(fd);
137 : : }
138 : :
139 : : // Finish writing the changeset before moving the base file into place.
140 : 60 : write_and_clear_changes(changes_fd, buf, base_size);
141 : :
142 : : #if defined __WIN32__
143 : : if (msvc_posix_rename(tmp_path.c_str(), base_path.c_str()) < 0) {
144 : : #else
145 [ - + ]: 60 : if (rename(tmp_path.c_str(), base_path.c_str()) < 0) {
146 : : #endif
147 : : // With NFS, rename() failing may just mean that the server crashed
148 : : // after successfully renaming, but before reporting this, and then
149 : : // the retried operation fails. So we need to check if the source
150 : : // file still exists, which we do by calling unlink(), since we want
151 : : // to remove the temporary file anyway.
152 : 0 : int saved_errno = errno;
153 [ # # # # ]: 0 : if (unlink(tmp_path) == 0 || errno != ENOENT) {
[ # # ]
154 : 0 : string msg("Couldn't update base file ");
155 : 0 : msg += tablename;
156 : 0 : msg += ".base";
157 : 0 : msg += letter;
158 : 0 : throw DatabaseError(msg, saved_errno);
159 : : }
160 : 60 : }
161 : 60 : }
162 : :
163 : : void
164 : 125 : FlintDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename,
165 : : string & buf,
166 : : RemoteConnection & conn,
167 : : double end_time,
168 : : int changes_fd) const
169 : : {
170 : 125 : const char *ptr = buf.data();
171 : 125 : const char *end = ptr + buf.size();
172 : :
173 : : unsigned int changeset_blocksize;
174 [ - + ]: 125 : if (!F_unpack_uint(&ptr, end, &changeset_blocksize))
175 : 0 : throw NetworkError("Invalid blocksize in changeset");
176 : 125 : write_and_clear_changes(changes_fd, buf, ptr - buf.data());
177 : :
178 : 125 : string db_path = db_dir + "/" + tablename + ".DB";
179 : : #ifdef __WIN32__
180 : : int fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_BINARY);
181 : : #else
182 : 125 : int fd = ::open(db_path.c_str(), O_WRONLY | O_BINARY, 0666);
183 : : #endif
184 [ + + ]: 125 : if (fd == -1) {
185 [ - + ]: 1 : if (file_exists(db_path)) {
186 : 0 : string msg = "Failed to open ";
187 : 0 : msg += db_path;
188 : 0 : throw DatabaseError(msg, errno);
189 : : }
190 : : #ifdef __WIN32__
191 : : fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
192 : : #else
193 : 1 : fd = ::open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
194 : : #endif
195 [ - + ]: 1 : if (fd == -1) {
196 : 0 : string msg = "Failed to create and open ";
197 : 0 : msg += db_path;
198 : 0 : throw DatabaseError(msg, errno);
199 : : }
200 : : }
201 : : {
202 : 125 : fdcloser closer(fd);
203 : :
204 : 119 : while (true) {
205 : 244 : conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
206 : 232 : ptr = buf.data();
207 : 232 : end = ptr + buf.size();
208 : :
209 : : uint4 block_number;
210 [ - + ]: 232 : if (!F_unpack_uint(&ptr, end, &block_number))
211 : 0 : throw NetworkError("Invalid block number in changeset");
212 : 232 : write_and_clear_changes(changes_fd, buf, ptr - buf.data());
213 [ + + ]: 232 : if (block_number == 0)
214 : : break;
215 : 124 : --block_number;
216 : :
217 : 124 : conn.get_message_chunk(buf, changeset_blocksize, end_time);
218 [ - + ]: 119 : if (buf.size() < changeset_blocksize)
219 : 0 : throw NetworkError("Incomplete block in changeset");
220 : :
221 : : // Write the block.
222 : : // FIXME - should use pwrite if that's available.
223 [ - + ]: 119 : if (lseek(fd, off_t(changeset_blocksize) * block_number, SEEK_SET) == -1) {
224 : 0 : string msg = "Failed to seek to block ";
225 : 0 : msg += str(block_number);
226 : 0 : throw DatabaseError(msg, errno);
227 : : }
228 : 119 : io_write(fd, buf.data(), changeset_blocksize);
229 : :
230 : 119 : write_and_clear_changes(changes_fd, buf, changeset_blocksize);
231 : : }
232 : 125 : io_sync(fd);
233 : 125 : }
234 : 108 : }
235 : :
236 : : string
237 : 52 : FlintDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
238 : : double end_time,
239 : : bool valid) const
240 : : {
241 : : LOGCALL(DB, string, "FlintDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
242 : :
243 : : // Lock the database to perform modifications.
244 : 52 : FlintLock lock(db_dir);
245 : 52 : string explanation;
246 : 52 : FlintLock::reason why = lock.lock(true, explanation);
247 [ - + ]: 52 : if (why != FlintLock::SUCCESS) {
248 : 0 : lock.throw_databaselockerror(why, db_dir, explanation);
249 : : }
250 : :
251 : 52 : char type = conn.get_message_chunked(end_time);
252 : : (void) type; // Don't give warning about unused variable.
253 : : AssertEq(type, REPL_REPLY_CHANGESET);
254 : :
255 : 31 : string buf;
256 : : // Read enough to be certain that we've got the header part of the
257 : : // changeset.
258 : :
259 : 31 : conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
260 : : // Check the magic string.
261 [ - + ]: 31 : if (!startswith(buf, CHANGES_MAGIC_STRING)) {
262 : 0 : throw NetworkError("Invalid ChangeSet magic string");
263 : : }
264 : 31 : const char *ptr = buf.data();
265 : 31 : const char *end = ptr + buf.size();
266 : 31 : ptr += CONST_STRLEN(CHANGES_MAGIC_STRING);
267 : :
268 : : unsigned int changes_version;
269 [ - + ]: 31 : if (!F_unpack_uint(&ptr, end, &changes_version))
270 : 0 : throw NetworkError("Couldn't read a valid version number from changeset");
271 [ - + ]: 31 : if (changes_version != CHANGES_VERSION)
272 : 0 : throw NetworkError("Unsupported changeset version");
273 : :
274 : : flint_revision_number_t startrev;
275 : : flint_revision_number_t endrev;
276 : :
277 [ - + ]: 31 : if (!F_unpack_uint(&ptr, end, &startrev))
278 : 0 : throw NetworkError("Couldn't read a valid start revision from changeset");
279 [ - + ]: 31 : if (!F_unpack_uint(&ptr, end, &endrev))
280 : 0 : throw NetworkError("Couldn't read a valid end revision from changeset");
281 : :
282 [ - + ]: 31 : if (endrev <= startrev)
283 : 0 : throw NetworkError("End revision in changeset is not later than start revision");
284 : :
285 [ - + ]: 31 : if (ptr == end)
286 : 0 : throw NetworkError("Unexpected end of changeset (1)");
287 : :
288 : 31 : int changes_fd = -1;
289 : 31 : string changes_name;
290 [ + - ]: 31 : if (max_changesets > 0) {
291 : : changes_fd = create_changeset_file(db_dir, "changes" + str(startrev),
292 : 31 : changes_name);
293 : : }
294 : 31 : fdcloser closer(changes_fd);
295 : :
296 [ + - ]: 31 : if (valid) {
297 : : // Check the revision number.
298 : : // If the database was not known to be valid, we cannot
299 : : // reliably determine its revision number, so must skip this
300 : : // check.
301 : 31 : FlintRecordTable record_table(db_dir, true);
302 : 31 : record_table.open();
303 [ + + ]: 31 : if (startrev != record_table.get_open_revision_number())
304 : 32 : throw NetworkError("Changeset supplied is for wrong revision number");
305 : : }
306 : :
307 : 30 : unsigned char changes_type = ptr[0];
308 [ - + ]: 30 : if (changes_type != 0) {
309 : 0 : throw NetworkError("Unsupported changeset type: " + str(changes_type));
310 : : // FIXME - support changes of type 1, produced when DANGEROUS mode is
311 : : // on.
312 : : }
313 : :
314 : : // Write and clear the bits of the buffer which have been read.
315 : 30 : write_and_clear_changes(changes_fd, buf, ptr + 1 - buf.data());
316 : :
317 : : // Read the items from the changeset.
318 : 185 : while (true) {
319 : 198 : conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
320 : 197 : ptr = buf.data();
321 : 197 : end = ptr + buf.size();
322 : :
323 : : // Read the type of the next chunk of data
324 [ - + ]: 197 : if (ptr == end)
325 : 0 : throw NetworkError("Unexpected end of changeset (2)");
326 : 197 : unsigned char chunk_type = ptr[0];
327 : 197 : ++ptr;
328 [ + + ]: 197 : if (chunk_type == 0)
329 : : break;
330 : :
331 : : // Get the tablename.
332 : 185 : string tablename;
333 [ - + ]: 185 : if (!F_unpack_string(&ptr, end, tablename))
334 : 0 : throw NetworkError("Unexpected end of changeset (3)");
335 [ - + ]: 185 : if (tablename.empty())
336 : 0 : throw NetworkError("Missing tablename in changeset");
337 [ - + ]: 185 : if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
338 : : tablename.npos)
339 : 0 : throw NetworkError("Invalid character in tablename in changeset");
340 : :
341 : : // Process the chunk
342 [ - + ]: 185 : if (ptr == end)
343 : 0 : throw NetworkError("Unexpected end of changeset (4)");
344 : 185 : write_and_clear_changes(changes_fd, buf, ptr - buf.data());
345 : :
346 [ + + - ]: 185 : switch (chunk_type) {
347 : : case 1:
348 : : process_changeset_chunk_base(tablename, buf, conn, end_time,
349 : 60 : changes_fd);
350 : 60 : break;
351 : : case 2:
352 : : process_changeset_chunk_blocks(tablename, buf, conn, end_time,
353 : 125 : changes_fd);
354 : 108 : break;
355 : : default:
356 : 0 : throw NetworkError("Unrecognised item type in changeset");
357 : : }
358 : : }
359 : : flint_revision_number_t reqrev;
360 [ - + ]: 12 : if (!F_unpack_uint(&ptr, end, &reqrev))
361 : 0 : throw NetworkError("Couldn't read a valid required revision from changeset");
362 [ - + ]: 12 : if (reqrev < endrev)
363 : 0 : throw NetworkError("Required revision in changeset is earlier than end revision");
364 [ - + ]: 12 : if (ptr != end)
365 : 0 : throw NetworkError("Junk found at end of changeset");
366 : :
367 : 12 : write_and_clear_changes(changes_fd, buf, buf.size());
368 : 12 : buf = F_pack_uint(reqrev);
369 : 149 : RETURN(buf);
370 : : }
371 : :
372 : : string
373 : 16 : FlintDatabaseReplicator::get_uuid() const
374 : : {
375 : : LOGCALL(DB, string, "FlintDatabaseReplicator::get_uuid", NO_ARGS);
376 : 16 : FlintVersion version_file(db_dir);
377 : : try {
378 : 16 : version_file.read_and_check(true);
379 : 0 : } catch (const DatabaseError &) {
380 : 0 : RETURN(string());
381 : : }
382 : 16 : RETURN(version_file.get_uuid_string());
383 : : }
|