Changeset 23571
- Timestamp:
- Mar 26, 2009, 4:26:44 PM (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/neb_distrib_20081210/Nebulous-Server/lib/Nebulous/Server.pm
r23537 r23571 64 64 } 65 65 66 sub db 66 # EAM : In the 'distributed' version of Nebulous, there is a collection of N databases 67 # the db_index uniquely defines the db used by a given key 68 sub db_index_for_key 67 69 { 68 70 my ($self, $key) = @_; 71 72 my $config = $self->config; 73 74 my $db_index = 0; 75 die "key not defined" unless defined $key; 76 77 # hash the key to select the correct database instance 78 # only use the first 8 hex chars... have to be careful to avoid an int 79 # overflow here 80 # XXX this needs to be modified to follow my key prescription: 81 # neb://host/path/path/filename.ext -> use path/path/filename (ignore neb://host and .ext) 82 # EAM : test to double-check hashing behavior 83 $db_index = unpack("h8", sha1_hex("$key")) % $config->n_db; 84 85 return $db_index; 86 } 87 88 sub db_for_index 89 { 90 my ($self, $db_index) = @_; 69 91 70 92 my $log = $self->log; 71 93 my $sql = $self->sql; 72 94 my $config = $self->config; 73 74 my $db_index = 0;75 if (defined $key) {76 # hash the key to select the correct database instance77 # only use the first 8 hex chars... have to be careful to avoid an int78 # overflow here79 $db_index = unpack("h8", sha1_hex("$key")) % $config->n_db;80 }81 95 82 96 # lookup to see if we have a stored dbh for this database … … 128 142 } 129 143 144 sub db 145 { 146 my ($self, $key) = @_; 147 148 my $log = $self->log; 149 my $sql = $self->sql; 150 my $config = $self->config; 151 152 die "key not defined" unless defined $key; 153 my $db_index = $self->db_index_for_key($key); 154 155 my $dbh = $self->db_for_index($db_index); 156 return $dbh; 157 } 158 159 # XXX need a function to loop over index values 130 160 131 161 sub create_object … … 322 352 } 323 353 324 325 # sub swap_objects 326 # { 327 # my $self = shift; 328 # 329 # my ($key1, $key2) = validate_pos(@_, 330 # { 331 # type => SCALAR, 332 # callbacks => { 333 # 'is valid object key' => sub { $self->_is_valid_object_key($_[0]) }, 334 # }, 335 # }, 336 # { 337 # type => SCALAR, 338 # callbacks => { 339 # 'is valid object key' => sub { $self->_is_valid_object_key($_[0]) }, 340 # }, 341 # }, 342 # ); 343 # 344 # my $log = $self->log; 345 # my $sql = $self->sql; 346 # my $dbh1 = $self->db($key1); 347 # my $dbh2 = $self->db($key2); 348 # 349 # $log->debug("entered - @_"); 350 # 351 # # ignore volumes 352 # $key1 = parse_neb_key($key1); 353 # $key2 = parse_neb_key($key2); 354 # 355 # # order of operations for the swap with a single db is: 356 # # key1 -> key1.swap 357 # # key2 -> key1 358 # # key1.swap -> key2 359 # 360 # # XXX this cmp will only work if ->db() returns the same exact (cached) dbh 361 # if ($dbh1 == $dbh2) { 362 # my $dbh = $dbh1; 363 # eval { 364 # { 365 # # key1 -> key1.swap 366 # my $query = $dbh->prepare_cached($sql->rename_object); 367 # # this SQL statment takes the new key name as the first param 368 # my $rows = $query->execute($key1->path . ".swap", $key1->path); 369 # 370 # # if we affected more then one row something very bad has happened. 371 # unless ($rows == 1) { 372 # $query->finish; 373 # $log->logdie("affected row count is $rows instead of 1"); 374 # } 375 # } 376 # 377 # { 378 # # key2 -> key1 379 # my $query = $dbh->prepare_cached($sql->rename_object); 380 # # this SQL statment takes the new key name as the first param 381 # my $rows = $query->execute($key1->path, $key2->path); 382 # 383 # # if we affected more then one row something very bad has happened. 384 # unless ($rows == 1) { 385 # $query->finish; 386 # $log->logdie("affected row count is $rows instead of 1"); 387 # } 388 # } 389 # 390 # { 391 # # key1.swap -> key2 392 # my $query = $dbh->prepare_cached($sql->rename_object); 393 # # this SQL statment takes the new key name as the first param 394 # my $rows = $query->execute($key2->path, $key1->path . ".swap"); 395 # 396 # # if we affected more then one row something very bad has happened. 397 # unless ($rows == 1) { 398 # $query->finish; 399 # $log->logdie("affected row count is $rows instead of 1"); 400 # } 401 # } 402 # 403 # $dbn->commit; 404 # $log->debug("commit"); 405 # }; 406 # if ($@) { 407 # $dbh->rollback; 408 # $log->debug("rollback"); 409 # $log->logdie("database error: $@"); 410 # } 411 # } 412 # 413 # # order of operations for the swap between two dbs is: 414 # # key1 start transaction 415 # # key1 -> read all instances 416 # # key1 -> remove all instances 417 # # key2 start transaction 418 # # key2 -> read all instances 419 # # key2 -> remove all instances 420 # # key1 -> insert key 2 instances 421 # # key2 -> insert key 1 instances 422 # # key1,2 commit 423 # 424 # eval { 425 # { 426 # # key1 -> read all instances 427 # my $query = $dbh->prepare_cached($sql->rename_object); 428 # # this SQL statment takes the new key name as the first param 429 # my $rows = $query->execute($key1->path . ".swap", $key1->path); 430 # 431 # # if we affected more then one row something very bad has happened. 432 # unless ($rows == 1) { 433 # $query->finish; 434 # $log->logdie("affected row count is $rows instead of 1"); 435 # } 436 # } 437 # 438 # { 439 # # key2 -> key1 440 # my $query = $db->prepare_cached($sql->rename_object); 441 # # this SQL statment takes the new key name as the first param 442 # my $rows = $query->execute($key1->path, $key2->path); 443 # 444 # # if we affected more then one row something very bad has happened. 445 # unless ($rows == 1) { 446 # $query->finish; 447 # $log->logdie("affected row count is $rows instead of 1"); 448 # } 449 # } 450 # 451 # { 452 # # key1.swap -> key2 453 # my $query = $db->prepare_cached($sql->rename_object); 454 # # this SQL statment takes the new key name as the first param 455 # my $rows = $query->execute($key2->path, $key1->path . ".swap"); 456 # 457 # # if we affected more then one row something very bad has happened. 458 # unless ($rows == 1) { 459 # $query->finish; 460 # $log->logdie("affected row count is $rows instead of 1"); 461 # } 462 # } 463 # 464 # $db->commit; 465 # $log->debug("commit"); 466 # }; 467 # if ($@) { 468 # $db->rollback; 469 # $log->debug("rollback"); 470 # $log->logdie("database error: $@"); 471 # } 472 # 473 # 474 # $log->debug("leaving"); 475 # 476 # return 1; 477 # } 478 # 479 480 # ### this block below comes from the single branch, and implements 481 # the transactions in the new way (two passes?). the above handles 482 # the case of multiple dbs 483 # 484 # TRANS: while (1) { 485 # eval { 486 # { 487 # # key1 -> key1.swap 488 # my $query = $db->prepare_cached($sql->rename_object); 489 # # this SQL statment takes the new key name as the first param 490 # my $rows = $query->execute($key1->path . ".swap", $key1->path); 491 # 492 # # if we affected more then one row something very bad has happened. 493 # unless ($rows == 1) { 494 # $query->finish; 495 # $log->logdie("affected row count is $rows instead of 1"); 496 # } 497 # } 498 # 499 # { 500 # # key2 -> key1 501 # my $query = $db->prepare_cached($sql->rename_object); 502 # # this SQL statment takes the new key name as the first param 503 # my $rows = $query->execute($key1->path, $key2->path); 504 # 505 # # if we affected more then one row something very bad has happened. 506 # unless ($rows == 1) { 507 # $query->finish; 508 # $log->logdie("affected row count is $rows instead of 1"); 509 # } 510 # } 511 # 512 # { 513 # # key1.swap -> key2 514 # my $query = $db->prepare_cached($sql->rename_object); 515 # # this SQL statment takes the new key name as the first param 516 # my $rows = $query->execute($key2->path, $key1->path . ".swap"); 517 # 518 # # if we affected more then one row something very bad has happened. 519 # unless ($rows == 1) { 520 # $query->finish; 521 # $log->logdie("affected row count is $rows instead of 1"); 522 # } 523 # } 524 # 525 # $db->commit; 526 # $log->debug("commit"); 527 # }; 528 # if ($@) { 529 # $db->rollback; 530 # $log->debug("rollback"); 531 # if ($@ =~ /Deadlock found/) { 532 # $log->warn("database deadlock retrying transaction: $@"); 533 # redo TRANS; 534 # } 535 # $log->logdie("database error: $@"); 536 # } 537 # last; 538 # } 539 # 540 # $log->debug("leaving"); 541 # 542 # return 1; 543 # } 354 sub swap_objects 355 { 356 my $self = shift; 357 358 my ($key1, $key2) = validate_pos(@_, 359 { 360 type => SCALAR, 361 callbacks => { 362 'is valid object key' => sub { $self->_is_valid_object_key($_[0]) }, 363 }, 364 }, 365 { 366 type => SCALAR, 367 callbacks => { 368 'is valid object key' => sub { $self->_is_valid_object_key($_[0]) }, 369 }, 370 }, 371 ); 372 373 my $log = $self->log; 374 my $sql = $self->sql; 375 376 my $dbidx1 = $self->db_index_for_key($key1); 377 my $dbidx2 = $self->db_index_for_key($key2); 378 die "cannot swap keys not stored on the same database" unless ($dbidx1 == $dbidx2); 379 380 my $dbh1 = $self->db_for_index($dbidx1); 381 my $dbh2 = $self->db_for_index($dbidx2); 382 die "different db handles for the same db?" unless ($dbh1 == $dbh2); 383 384 $log->debug("entered - @_"); 385 386 # ignore volumes 387 $key1 = parse_neb_key($key1); 388 $key2 = parse_neb_key($key2); 389 390 # order of operations for the swap with a single db is: 391 # key1 -> key1.swap 392 # key2 -> key1 393 # key1.swap -> key2 394 395 my $dbh = $dbh1; 396 TRANS: while (1) { 397 eval { 398 { 399 # key1 -> key1.swap 400 my $query = $db->prepare_cached($sql->rename_object); 401 # this SQL statment takes the new key name as the first param 402 my $rows = $query->execute($key1->path . ".swap", $key1->path); 403 404 # if we affected more then one row something very bad has happened. 405 unless ($rows == 1) { 406 $query->finish; 407 $log->logdie("affected row count is $rows instead of 1"); 408 } 409 } 410 411 { 412 # key2 -> key1 413 my $query = $db->prepare_cached($sql->rename_object); 414 # this SQL statment takes the new key name as the first param 415 my $rows = $query->execute($key1->path, $key2->path); 416 417 # if we affected more then one row something very bad has happened. 418 unless ($rows == 1) { 419 $query->finish; 420 $log->logdie("affected row count is $rows instead of 1"); 421 } 422 } 423 424 { 425 # key1.swap -> key2 426 my $query = $db->prepare_cached($sql->rename_object); 427 # this SQL statment takes the new key name as the first param 428 my $rows = $query->execute($key2->path, $key1->path . ".swap"); 429 430 # if we affected more then one row something very bad has happened. 431 unless ($rows == 1) { 432 $query->finish; 433 $log->logdie("affected row count is $rows instead of 1"); 434 } 435 } 436 437 $db->commit; 438 $log->debug("commit"); 439 }; 440 if ($@) { 441 $db->rollback; 442 $log->debug("rollback"); 443 if ($@ =~ /Deadlock found/) { 444 $log->warn("database deadlock retrying transaction: $@"); 445 redo TRANS; 446 } 447 $log->logdie("database error: $@"); 448 } 449 last; 450 } 451 452 $log->debug("leaving"); 453 454 return 1; 455 } 456 457 # EAM : from JH, below is a possible option to swap instances across 458 # dbs. I recommend that this action be disallowed, and instead such 459 # operations be implemented only as a combination of copy and delete 460 461 # order of operations for the swap between two dbs is: 462 # key1 start transaction 463 # key1 -> read all instances 464 # key1 -> remove all instances 465 # key2 start transaction 466 # key2 -> read all instances 467 # key2 -> remove all instances 468 # key1 -> insert key 2 instances 469 # key2 -> insert key 1 instances 470 # key1,2 commit 544 471 545 472 sub replicate_object … … 644 571 645 572 # TODO add some stuff here to retry if unsucessful 573 # XXX if this fails, it should try to generate the 574 # instance on another volume (unless !$soft_volume) 646 575 $uri = $self->_create_empty_instance_file($key->path, $so_id, $ins_id, $vol_path, $vol_xattr); 647 576 … … 1137 1066 } 1138 1067 1139 1068 # loop over all db_index values, passing db_index to each call 1140 1069 sub find_objects 1141 1070 { 1142 # XXX: this will only search one db1143 1144 1071 my $self = shift; 1145 1072 … … 1153 1080 my $log = $self->log; 1154 1081 my $sql = $self->sql; 1155 my $db = $self->db;1156 1082 1157 1083 $log->debug( "entered - @_" ); … … 1164 1090 # attempt to strip off neb:// if it exists 1165 1091 $pattern =~ s|^neb:||; 1092 1093 my @keys = (); 1094 my $n_dbs = $self->n_db(); 1095 for (my $index = 0; $index < $n_dbs; $index ++) { 1096 1097 my $newkeys = $self->find_objects_for_index($index, $pattern); 1098 push @keys, @$newkeys; 1099 } 1100 $log->logdie("no keys found") unless ( scalar @keys ); 1101 1102 $log->debug( "leaving" ); 1103 1104 return \@keys; 1105 } 1106 1107 # find matching objects from the given server 1108 sub find_objects_for_index 1109 { 1110 1111 my $self = shift; 1112 my $index = shift; 1113 my $pattern = shift; 1114 my $db = $self->db_for_index($index); 1166 1115 1167 1116 my @keys; … … 1177 1126 $log->logdie("database error: $@") if $@; 1178 1127 1179 $log->logdie("no keys found") unless ( scalar @keys );1180 1181 $log->debug( "leaving" );1182 1183 1128 return \@keys; 1184 1129 } 1185 1186 1130 1187 1131 sub find_instances … … 1416 1360 } 1417 1361 1418 1362 # this should have a 'db_index' as an argument 1419 1363 sub mounts 1420 1364 { 1421 1365 # XXX: this will only pull the mounts from one db 1366 # XXX: loop over db_index and generate a single unique list 1367 # XXX: or report mount info for each db server 1422 1368 my $self = shift; 1423 1369 … … 1426 1372 my $log = $self->log; 1427 1373 my $sql = $self->sql; 1428 my $db = $self->db ;1374 my $db = $self->db_for_index(0); # XXX fix as above 1429 1375 1430 1376 $log->debug("entered - @_"); … … 1772 1718 $log->debug( "entered" ); 1773 1719 1720 # XXX do we need to loop over db_index? 1774 1721 # $self->db->disconnect; 1775 1722
Note:
See TracChangeset
for help on using the changeset viewer.
