1717 */
1818package org .apache .hadoop .hbase .mapreduce ;
1919
20+ import static org .apache .hadoop .hbase .mapreduce .HFileOutputFormat2 .MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY ;
2021import static org .hamcrest .CoreMatchers .equalTo ;
2122import static org .hamcrest .CoreMatchers .notNullValue ;
2223import static org .hamcrest .CoreMatchers .nullValue ;
3435import java .io .File ;
3536import java .io .IOException ;
3637import java .io .PrintStream ;
38+ import java .nio .charset .StandardCharsets ;
3739import java .util .ArrayList ;
3840import java .util .concurrent .ThreadLocalRandom ;
3941import org .apache .hadoop .conf .Configuration ;
4648import org .apache .hadoop .hbase .HBaseTestingUtil ;
4749import org .apache .hadoop .hbase .HConstants ;
4850import org .apache .hadoop .hbase .KeyValue ;
51+ import org .apache .hadoop .hbase .NamespaceDescriptor ;
4952import org .apache .hadoop .hbase .SingleProcessHBaseCluster ;
5053import org .apache .hadoop .hbase .TableName ;
5154import org .apache .hadoop .hbase .client .Delete ;
@@ -90,13 +93,20 @@ public class TestWALPlayer {
9093 public static final HBaseClassTestRule CLASS_RULE =
9194 HBaseClassTestRule .forClass (TestWALPlayer .class );
9295
96+ private static final byte [] FAMILY = Bytes .toBytes ("family" );
97+ private static final byte [] COLUMN1 = Bytes .toBytes ("c1" );
98+ private static final byte [] COLUMN2 = Bytes .toBytes ("c2" );
99+ private static final byte [] ROW = Bytes .toBytes ("row" );
100+
93101 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil ();
94102 private static SingleProcessHBaseCluster cluster ;
95103 private static Path rootDir ;
96104 private static Path walRootDir ;
97- private static FileSystem fs ;
105+ private static FileSystem localFs ;
98106 private static FileSystem logFs ;
99107 private static Configuration conf ;
108+ private static FileSystem hdfs ;
109+ private static String bulkLoadOutputDir ;
100110
101111 @ Rule
102112 public TestName name = new TestName ();
@@ -106,15 +116,18 @@ public static void beforeClass() throws Exception {
106116 conf = TEST_UTIL .getConfiguration ();
107117 rootDir = TEST_UTIL .createRootDir ();
108118 walRootDir = TEST_UTIL .createWALRootDir ();
109- fs = CommonFSUtils .getRootDirFileSystem (conf );
119+ localFs = CommonFSUtils .getRootDirFileSystem (conf );
110120 logFs = CommonFSUtils .getWALFileSystem (conf );
111121 cluster = TEST_UTIL .startMiniCluster ();
122+ hdfs = TEST_UTIL .getTestFileSystem ();
123+ bulkLoadOutputDir = new Path (new Path (TEST_UTIL .getConfiguration ().get ("fs.defaultFS" )),
124+ Path .SEPARATOR + "bulkLoadOutput" ).toString ();
112125 }
113126
114127 @ AfterClass
115128 public static void afterClass () throws Exception {
116129 TEST_UTIL .shutdownMiniCluster ();
117- fs .delete (rootDir , true );
130+ localFs .delete (rootDir , true );
118131 logFs .delete (walRootDir , true );
119132 }
120133
@@ -235,18 +248,11 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
235248 public void testWALPlayer () throws Exception {
236249 final TableName tableName1 = TableName .valueOf (name .getMethodName () + "1" );
237250 final TableName tableName2 = TableName .valueOf (name .getMethodName () + "2" );
238- final byte [] FAMILY = Bytes .toBytes ("family" );
239- final byte [] COLUMN1 = Bytes .toBytes ("c1" );
240- final byte [] COLUMN2 = Bytes .toBytes ("c2" );
241- final byte [] ROW = Bytes .toBytes ("row" );
242251 Table t1 = TEST_UTIL .createTable (tableName1 , FAMILY );
243252 Table t2 = TEST_UTIL .createTable (tableName2 , FAMILY );
244253
245- // put a row into the first table
246- Put p = new Put (ROW );
247- p .addColumn (FAMILY , COLUMN1 , COLUMN1 );
248- p .addColumn (FAMILY , COLUMN2 , COLUMN2 );
249- t1 .put (p );
254+ putRowIntoTable (t1 );
255+
250256 // delete one column
251257 Delete d = new Delete (ROW );
252258 d .addColumns (FAMILY , COLUMN1 );
@@ -411,6 +417,109 @@ public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception {
411417 assertNotEquals ("WALPlayer should fail on empty files when not ignored" , 0 , exitCode );
412418 }
413419
420+ /**
421+ * Verifies the HFile output format for WALPlayer has the following directory structure when
422+ * hbase.mapreduce.use.multi.table.hfileoutputformat is set to true:
423+ * .../BULK_OUTPUT_CONF_KEY/namespace/tableName/columnFamily
424+ */
425+ @ Test
426+ public void testWALPlayerMultiTableHFileOutputFormat () throws Exception {
427+ String namespace = "ns_" + name .getMethodName ();
428+ TEST_UTIL .getAdmin ().createNamespace (NamespaceDescriptor .create (namespace ).build ());
429+ final TableName tableName1 = TableName .valueOf (name .getMethodName () + "1" );
430+ final TableName tableName2 = TableName .valueOf (namespace , name .getMethodName () + "2" );
431+ Table t1 = TEST_UTIL .createTable (tableName1 , FAMILY );
432+ Table t2 = TEST_UTIL .createTable (tableName2 , FAMILY );
433+
434+ putRowIntoTable (t1 );
435+ putRowIntoTable (t2 );
436+
437+ Configuration multiTableOutputConf = new Configuration (conf );
438+ setConfSimilarToIncrementalBackupWALToHFilesMethod (multiTableOutputConf );
439+
440+ // We are testing this config variable's effect on HFile output for the WALPlayer
441+ multiTableOutputConf .setBoolean (MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY , true );
442+
443+ WALPlayer player = new WALPlayer (multiTableOutputConf );
444+ String walInputDir = new Path (cluster .getMaster ().getMasterFileSystem ().getWALRootDir (),
445+ HConstants .HREGION_LOGDIR_NAME ).toString ();
446+ String tables = tableName1 .getNameAsString () + "," + tableName2 .getNameAsString ();
447+
448+ ToolRunner .run (multiTableOutputConf , player , new String [] { walInputDir , tables });
449+
450+ assertMultiTableOutputFormatDirStructure (tableName1 , "default" );
451+ assertMultiTableOutputFormatDirStructure (tableName2 , namespace );
452+
453+ hdfs .delete (new Path (bulkLoadOutputDir ), true );
454+ }
455+
456+ /**
457+ * Verifies the HFile output format for WALPlayer has the following directory structure when
458+ * hbase.mapreduce.use.multi.table.hfileoutputformat is set to false:
459+ * .../BULK_OUTPUT_CONF_KEY/columnFamily. Also verifies an exception occurs when the WALPlayer is
460+ * run on multiple tables at once while hbase.mapreduce.use.multi.table.hfileoutputformat is set
461+ * to false.
462+ */
463+ @ Test
464+ public void testWALPlayerSingleTableHFileOutputFormat () throws Exception {
465+ String namespace = "ns_" + name .getMethodName ();
466+ TEST_UTIL .getAdmin ().createNamespace (NamespaceDescriptor .create (namespace ).build ());
467+ final TableName tableName1 = TableName .valueOf (name .getMethodName () + "1" );
468+ final TableName tableName2 = TableName .valueOf (namespace , name .getMethodName () + "2" );
469+ Table t1 = TEST_UTIL .createTable (tableName1 , FAMILY );
470+ Table t2 = TEST_UTIL .createTable (tableName2 , FAMILY );
471+
472+ putRowIntoTable (t1 );
473+ putRowIntoTable (t2 );
474+
475+ String bulkLoadOutputDir = new Path (new Path (TEST_UTIL .getConfiguration ().get ("fs.defaultFS" )),
476+ Path .SEPARATOR + "bulkLoadOutput" ).toString ();
477+
478+ Configuration singleTableOutputConf = new Configuration (conf );
479+ setConfSimilarToIncrementalBackupWALToHFilesMethod (singleTableOutputConf );
480+
481+ // We are testing this config variable's effect on HFile output for the WALPlayer
482+ singleTableOutputConf .setBoolean (MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY , false );
483+
484+ WALPlayer player = new WALPlayer (singleTableOutputConf );
485+
486+ String walInputDir = new Path (cluster .getMaster ().getMasterFileSystem ().getWALRootDir (),
487+ HConstants .HREGION_LOGDIR_NAME ).toString ();
488+ String tables = tableName1 .getNameAsString () + "," + tableName2 .getNameAsString ();
489+
490+ // Expecting a failure here since we are running WALPlayer on multiple tables even though the
491+ // multi-table HFile output format is disabled
492+ try {
493+ ToolRunner .run (singleTableOutputConf , player , new String [] { walInputDir , tables });
494+ fail ("Expected a failure to occur due to using WALPlayer with multiple tables while having "
495+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " set to false" );
496+ } catch (IOException e ) {
497+ String expectedMsg = "Expected table names list to have only one table since "
498+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following "
499+ + "list of tables instead: [testWALPlayerSingleTableHFileOutputFormat1, " + namespace
500+ + ":testWALPlayerSingleTableHFileOutputFormat2]" ;
501+ assertTrue (e .getMessage ().contains (expectedMsg ));
502+ }
503+
504+ // Successfully run WALPlayer on just one table while having multi-table HFile output format
505+ // disabled
506+ ToolRunner .run (singleTableOutputConf , player ,
507+ new String [] { walInputDir , tableName1 .getNameAsString () });
508+
509+ Path bulkLoadOutputDirForTable = new Path (bulkLoadOutputDir , "family" );
510+ assertTrue ("Expected path to exist: " + bulkLoadOutputDirForTable ,
511+ hdfs .exists (bulkLoadOutputDirForTable ));
512+
513+ hdfs .delete (new Path (bulkLoadOutputDir ), true );
514+ }
515+
516+ private void putRowIntoTable (Table table ) throws IOException {
517+ Put p = new Put (ROW );
518+ p .addColumn (FAMILY , COLUMN1 , COLUMN1 );
519+ p .addColumn (FAMILY , COLUMN2 , COLUMN2 );
520+ table .put (p );
521+ }
522+
414523 private Path createEmptyWALFile (String walDir ) throws IOException {
415524 FileSystem dfs = TEST_UTIL .getDFSCluster ().getFileSystem ();
416525 Path inputDir = new Path ("/" + walDir );
@@ -422,4 +531,22 @@ private Path createEmptyWALFile(String walDir) throws IOException {
422531
423532 return inputDir ;
424533 }
534+
535+ private void setConfSimilarToIncrementalBackupWALToHFilesMethod (Configuration conf ) {
536+ conf .set (WALPlayer .BULK_OUTPUT_CONF_KEY , bulkLoadOutputDir );
537+ conf .set (WALPlayer .INPUT_FILES_SEPARATOR_KEY , ";" );
538+ conf .setBoolean (WALPlayer .MULTI_TABLES_SUPPORT , true );
539+ conf .set ("mapreduce.job.name" , name .getMethodName () + "-" + System .currentTimeMillis ());
540+ conf .setBoolean (HFileOutputFormat2 .DISK_BASED_SORTING_ENABLED_KEY , true );
541+ }
542+
543+ private void assertMultiTableOutputFormatDirStructure (TableName tableName , String namespace )
544+ throws IOException {
545+ Path qualifierAndFamilyDir =
546+ new Path (tableName .getQualifierAsString (), new String (FAMILY , StandardCharsets .UTF_8 ));
547+ Path namespaceQualifierFamilyDir = new Path (namespace , qualifierAndFamilyDir );
548+ Path bulkLoadOutputDirForTable = new Path (bulkLoadOutputDir , namespaceQualifierFamilyDir );
549+ assertTrue ("Expected path to exist: " + bulkLoadOutputDirForTable ,
550+ hdfs .exists (bulkLoadOutputDirForTable ));
551+ }
425552}
0 commit comments