Branch data Line data Source code
1 : : /** @file brass_databasereplicator.cc
2 : : * @brief Support for brass database replication
3 : : */
4 : : /* Copyright 2008 Lemur Consulting Ltd
5 : : * Copyright 2009,2010 Olly Betts
6 : : *
7 : : * This program is free software; you can redistribute it and/or
8 : : * modify it under the terms of the GNU General Public License as
9 : : * published by the Free Software Foundation; either version 2 of the
10 : : * License, or (at your option) any later version.
11 : : *
12 : : * This program is distributed in the hope that it will be useful,
13 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : : * GNU General Public License for more details.
16 : : *
17 : : * You should have received a copy of the GNU General Public License
18 : : * along with this program; if not, write to the Free Software
19 : : * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20 : : * USA
21 : : */
22 : :
23 : : #include <config.h>
24 : :
25 : : #include "brass_databasereplicator.h"
26 : :
27 : : #include "xapian/error.h"
28 : :
29 : : #include "../flint_lock.h"
30 : : #include "brass_record.h"
31 : : #include "brass_replicate_internal.h"
32 : : #include "brass_types.h"
33 : : #include "brass_version.h"
34 : : #include "debuglog.h"
35 : : #include "io_utils.h"
36 : : #include "pack.h"
37 : : #include "remoteconnection.h"
38 : : #include "replicationprotocol.h"
39 : : #include "safeerrno.h"
40 : : #include "str.h"
41 : : #include "stringutils.h"
42 : : #include "utils.h"
43 : :
44 : : #ifdef __WIN32__
45 : : # include "msvc_posix_wrapper.h"
46 : : #endif
47 : :
48 : : #include <cstdio> // For rename().
49 : :
50 : : using namespace std;
51 : : using namespace Xapian;
52 : :
53 : 59 : BrassDatabaseReplicator::BrassDatabaseReplicator(const string & db_dir_)
54 : 59 : : db_dir(db_dir_)
55 : : {
56 : 59 : }
57 : :
58 : : bool
59 : 5 : BrassDatabaseReplicator::check_revision_at_least(const string & rev,
60 : : const string & target) const
61 : : {
62 : : LOGCALL(DB, bool, "BrassDatabaseReplicator::check_revision_at_least", rev | target);
63 : :
64 : : brass_revision_number_t rev_val;
65 : : brass_revision_number_t target_val;
66 : :
67 : 5 : const char * ptr = rev.data();
68 : 5 : const char * end = ptr + rev.size();
69 [ - + ]: 5 : if (!unpack_uint(&ptr, end, &rev_val)) {
70 : 0 : throw NetworkError("Invalid revision string supplied to check_revision_at_least");
71 : : }
72 : :
73 : 5 : ptr = target.data();
74 : 5 : end = ptr + target.size();
75 [ - + ]: 5 : if (!unpack_uint(&ptr, end, &target_val)) {
76 : 0 : throw NetworkError("Invalid revision string supplied to check_revision_at_least");
77 : : }
78 : :
79 : 5 : RETURN(rev_val >= target_val);
80 : : }
81 : :
82 : : void
83 : 44 : BrassDatabaseReplicator::process_changeset_chunk_base(const string & tablename,
84 : : string & buf,
85 : : RemoteConnection & conn,
86 : : double end_time) const
87 : : {
88 : 44 : const char *ptr = buf.data();
89 : 44 : const char *end = ptr + buf.size();
90 : :
91 : : // Get the letter
92 : 44 : char letter = ptr[0];
93 [ + + - + ]: 44 : if (letter != 'A' && letter != 'B')
94 : 0 : throw NetworkError("Invalid base file letter in changeset");
95 : 44 : ++ptr;
96 : :
97 : :
98 : : // Get the base size
99 [ - + ]: 44 : if (ptr == end)
100 : 0 : throw NetworkError("Unexpected end of changeset (5)");
101 : : string::size_type base_size;
102 [ - + ]: 44 : if (!unpack_uint(&ptr, end, &base_size))
103 : 0 : throw NetworkError("Invalid base file size in changeset");
104 : :
105 : : // Get the new base file into buf.
106 : 44 : buf.erase(0, ptr - buf.data());
107 : 44 : conn.get_message_chunk(buf, base_size, end_time);
108 : :
109 [ - + ]: 44 : if (buf.size() < base_size)
110 : 0 : throw NetworkError("Unexpected end of changeset (6)");
111 : :
112 : : // Write base_size bytes from start of buf to base file for tablename
113 : 44 : string tmp_path = db_dir + "/" + tablename + "tmp";
114 : 44 : string base_path = db_dir + "/" + tablename + ".base" + letter;
115 : : #ifdef __WIN32__
116 : : int fd = msvc_posix_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
117 : : #else
118 : 44 : int fd = ::open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
119 : : #endif
120 [ - + ]: 44 : if (fd == -1) {
121 : 0 : string msg = "Failed to open ";
122 : 0 : msg += tmp_path;
123 : 0 : throw DatabaseError(msg, errno);
124 : : }
125 : : {
126 : 44 : fdcloser closer(fd);
127 : :
128 : 44 : io_write(fd, buf.data(), base_size);
129 : 44 : io_sync(fd);
130 : : }
131 : : #if defined __WIN32__
132 : : if (msvc_posix_rename(tmp_path.c_str(), base_path.c_str()) < 0) {
133 : : #else
134 [ - + ]: 44 : if (rename(tmp_path.c_str(), base_path.c_str()) < 0) {
135 : : #endif
136 : : // With NFS, rename() failing may just mean that the server crashed
137 : : // after successfully renaming, but before reporting this, and then
138 : : // the retried operation fails. So we need to check if the source
139 : : // file still exists, which we do by calling unlink(), since we want
140 : : // to remove the temporary file anyway.
141 : 0 : int saved_errno = errno;
142 [ # # # # ]: 0 : if (unlink(tmp_path) == 0 || errno != ENOENT) {
[ # # ]
143 : 0 : string msg("Couldn't update base file ");
144 : 0 : msg += tablename;
145 : 0 : msg += ".base";
146 : 0 : msg += letter;
147 : 0 : throw DatabaseError(msg, saved_errno);
148 : : }
149 : : }
150 : :
151 : 44 : buf.erase(0, base_size);
152 : 44 : }
153 : :
154 : : void
155 : 92 : BrassDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename,
156 : : string & buf,
157 : : RemoteConnection & conn,
158 : : double end_time) const
159 : : {
160 : 92 : const char *ptr = buf.data();
161 : 92 : const char *end = ptr + buf.size();
162 : :
163 : : unsigned int changeset_blocksize;
164 [ - + ]: 92 : if (!unpack_uint(&ptr, end, &changeset_blocksize))
165 : 0 : throw NetworkError("Invalid blocksize in changeset");
166 : 92 : buf.erase(0, ptr - buf.data());
167 : :
168 : 92 : string db_path = db_dir + "/" + tablename + ".DB";
169 : : #ifdef __WIN32__
170 : : int fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_BINARY);
171 : : #else
172 : 92 : int fd = ::open(db_path.c_str(), O_WRONLY | O_BINARY, 0666);
173 : : #endif
174 [ + + ]: 92 : if (fd == -1) {
175 [ - + ]: 2 : if (file_exists(db_path)) {
176 : 0 : string msg = "Failed to open ";
177 : 0 : msg += db_path;
178 : 0 : throw DatabaseError(msg, errno);
179 : : }
180 : : #ifdef __WIN32__
181 : : fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
182 : : #else
183 : 2 : fd = ::open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
184 : : #endif
185 [ - + ]: 2 : if (fd == -1) {
186 : 0 : string msg = "Failed to create and open ";
187 : 0 : msg += db_path;
188 : 0 : throw DatabaseError(msg, errno);
189 : : }
190 : : }
191 : : {
192 : 92 : fdcloser closer(fd);
193 : :
194 : 83 : while (true) {
195 : 175 : conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
196 : 164 : ptr = buf.data();
197 : 164 : end = ptr + buf.size();
198 : :
199 : : uint4 block_number;
200 [ - + ]: 164 : if (!unpack_uint(&ptr, end, &block_number))
201 : 0 : throw NetworkError("Invalid block number in changeset");
202 : 164 : buf.erase(0, ptr - buf.data());
203 [ + + ]: 164 : if (block_number == 0)
204 : : break;
205 : 87 : --block_number;
206 : :
207 : 87 : conn.get_message_chunk(buf, changeset_blocksize, end_time);
208 [ - + ]: 83 : if (buf.size() < changeset_blocksize)
209 : 0 : throw NetworkError("Incomplete block in changeset");
210 : :
211 : : // Write the block.
212 : : // FIXME - should use pwrite if that's available.
213 [ - + ]: 83 : if (lseek(fd, off_t(changeset_blocksize) * block_number, SEEK_SET) == -1) {
214 : 0 : string msg = "Failed to seek to block ";
215 : 0 : msg += str(block_number);
216 : 0 : throw DatabaseError(msg, errno);
217 : : }
218 : 83 : io_write(fd, buf.data(), changeset_blocksize);
219 : :
220 : 83 : buf.erase(0, changeset_blocksize);
221 : : }
222 : 92 : io_sync(fd);
223 : 92 : }
224 : 77 : }
225 : :
226 : : string
227 : 49 : BrassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
228 : : double end_time,
229 : : bool valid) const
230 : : {
231 : : LOGCALL(DB, string, "BrassDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
232 : :
233 : : // Lock the database to perform modifications.
234 : 49 : FlintLock lock(db_dir);
235 : 49 : string explanation;
236 : 49 : FlintLock::reason why = lock.lock(true, explanation);
237 [ - + ]: 49 : if (why != FlintLock::SUCCESS) {
238 : 0 : lock.throw_databaselockerror(why, db_dir, explanation);
239 : : }
240 : :
241 : 49 : char type = conn.get_message_chunked(end_time);
242 : : (void) type; // Don't give warning about unused variable.
243 : : AssertEq(type, REPL_REPLY_CHANGESET);
244 : :
245 : 28 : string buf;
246 : : // Read enough to be certain that we've got the header part of the
247 : : // changeset.
248 : :
249 : 28 : conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
250 : : // Check the magic string.
251 [ - + ]: 28 : if (!startswith(buf, CHANGES_MAGIC_STRING)) {
252 : 0 : throw NetworkError("Invalid ChangeSet magic string");
253 : : }
254 : 28 : buf.erase(0, 12);
255 : 28 : const char *ptr = buf.data();
256 : 28 : const char *end = ptr + buf.size();
257 : :
258 : : unsigned int changes_version;
259 [ - + ]: 28 : if (!unpack_uint(&ptr, end, &changes_version))
260 : 0 : throw NetworkError("Couldn't read a valid version number from changeset");
261 [ - + ]: 28 : if (changes_version != CHANGES_VERSION)
262 : 0 : throw NetworkError("Unsupported changeset version");
263 : :
264 : : brass_revision_number_t startrev;
265 : : brass_revision_number_t endrev;
266 : :
267 [ - + ]: 28 : if (!unpack_uint(&ptr, end, &startrev))
268 : 0 : throw NetworkError("Couldn't read a valid start revision from changeset");
269 [ - + ]: 28 : if (!unpack_uint(&ptr, end, &endrev))
270 : 0 : throw NetworkError("Couldn't read a valid end revision from changeset");
271 : :
272 [ - + ]: 28 : if (endrev <= startrev)
273 : 0 : throw NetworkError("End revision in changeset is not later than start revision");
274 : :
275 [ - + ]: 28 : if (ptr == end)
276 : 0 : throw NetworkError("Unexpected end of changeset (1)");
277 : :
278 [ + - ]: 28 : if (valid) {
279 : : // Check the revision number.
280 : : // If the database was not known to be valid, we cannot
281 : : // reliably determine its revision number, so must skip this
282 : : // check.
283 : 28 : BrassRecordTable record_table(db_dir, true);
284 : 28 : record_table.open();
285 [ + + ]: 28 : if (startrev != record_table.get_open_revision_number())
286 : 29 : throw NetworkError("Changeset supplied is for wrong revision number");
287 : : }
288 : :
289 : 27 : unsigned char changes_type = ptr[0];
290 [ - + ]: 27 : if (changes_type != 0) {
291 : 0 : throw NetworkError("Unsupported changeset type: " + str(changes_type));
292 : : // FIXME - support changes of type 1, produced when DANGEROUS mode is
293 : : // on.
294 : : }
295 : :
296 : : // Clear the bits of the buffer which have been read.
297 : 27 : buf.erase(0, ptr + 1 - buf.data());
298 : :
299 : : // Read the items from the changeset.
300 : 136 : while (true) {
301 : 148 : conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
302 : 147 : ptr = buf.data();
303 : 147 : end = ptr + buf.size();
304 : :
305 : : // Read the type of the next chunk of data
306 [ - + ]: 147 : if (ptr == end)
307 : 0 : throw NetworkError("Unexpected end of changeset (2)");
308 : 147 : unsigned char chunk_type = ptr[0];
309 : 147 : ++ptr;
310 [ + + ]: 147 : if (chunk_type == 0)
311 : : break;
312 : :
313 : : // Get the tablename.
314 : 136 : string tablename;
315 [ - + ]: 136 : if (!unpack_string(&ptr, end, tablename))
316 : 0 : throw NetworkError("Unexpected end of changeset (3)");
317 [ - + ]: 136 : if (tablename.empty())
318 : 0 : throw NetworkError("Missing tablename in changeset");
319 [ - + ]: 136 : if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
320 : : tablename.npos)
321 : 0 : throw NetworkError("Invalid character in tablename in changeset");
322 : :
323 : : // Process the chunk
324 [ - + ]: 136 : if (ptr == end)
325 : 0 : throw NetworkError("Unexpected end of changeset (4)");
326 : 136 : buf.erase(0, ptr - buf.data());
327 : :
328 [ + + - ]: 136 : switch (chunk_type) {
329 : : case 1:
330 : 44 : process_changeset_chunk_base(tablename, buf, conn, end_time);
331 : 44 : break;
332 : : case 2:
333 : 92 : process_changeset_chunk_blocks(tablename, buf, conn, end_time);
334 : 77 : break;
335 : : default:
336 : 0 : throw NetworkError("Unrecognised item type in changeset");
337 : : }
338 : : }
339 : : brass_revision_number_t reqrev;
340 [ - + ]: 11 : if (!unpack_uint(&ptr, end, &reqrev))
341 : 0 : throw NetworkError("Couldn't read a valid required revision from changeset");
342 [ - + ]: 11 : if (reqrev < endrev)
343 : 0 : throw NetworkError("Required revision in changeset is earlier than end revision");
344 [ - + ]: 11 : if (ptr != end)
345 : 0 : throw NetworkError("Junk found at end of changeset");
346 : :
347 : 11 : buf.resize(0);
348 : 11 : pack_uint(buf, reqrev);
349 : 104 : RETURN(buf);
350 : : }
351 : :
352 : : string
353 : 10 : BrassDatabaseReplicator::get_uuid() const
354 : : {
355 : : LOGCALL(DB, string, "BrassDatabaseReplicator::get_uuid", NO_ARGS);
356 : 10 : BrassVersion version_file(db_dir);
357 : : try {
358 : 10 : version_file.read_and_check();
359 : 0 : } catch (const Xapian::DatabaseError &) {
360 : 0 : RETURN(string());
361 : : }
362 : 10 : RETURN(version_file.get_uuid_string());
363 : : }
|