Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions phoenix-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,23 @@ public static long getMaxLookbackInMillis(Configuration conf) {

/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";

/**
* PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum
*/
public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation";
public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = "_SyncTableChunkSizeBytes";
public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = "_SyncTableContinuedDigestState";

/**
* PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between
* hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo missing 'P'

* mapper). The coprocessor returns chunk metadata as HBase cells with these qualifiers, and the
* mapper parses them to extract chunk information.
*/
public static final byte[] SYNC_TABLE_START_KEY_QUALIFIER = Bytes.toBytes("START_KEY");
public static final byte[] SYNC_TABLE_HASH_QUALIFIER = Bytes.toBytes("HASH");
public static final byte[] SYNC_TABLE_ROW_COUNT_QUALIFIER = Bytes.toBytes("ROW_COUNT");
public static final byte[] SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER =
Bytes.toBytes("IS_PARTIAL_CHUNK");
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ public interface QueryServices extends SQLCloseable {

public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex";

// Timeout config for PhoenixSyncTableTool
public static final String SYNC_TABLE_QUERY_TIMEOUT_ATTRIB = "phoenix.sync.table.query.timeout";
public static final String SYNC_TABLE_RPC_TIMEOUT_ATTRIB = "phoenix.sync.table.rpc.timeout";
public static final String SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB =
"phoenix.sync.table.client.scanner.timeout";
public static final String SYNC_TABLE_RPC_RETRIES_COUNTER =
"phoenix.sync.table.rpc.retries.counter";

// Retries when doing server side writes to SYSTEM.CATALOG
public static final String METADATA_WRITE_RETRIES_NUMBER = "phoenix.metadata.rpc.retries.number";
public static final String METADATA_WRITE_RETRY_PAUSE = "phoenix.metadata.rpc.pause";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ public class QueryServicesOptions {
// hrs
public static final long DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD = 30000; // 30 secs

// 30 min scan timeout * 5 tries, with 2100ms total pause time between retries
public static final long DEFAULT_SYNC_TABLE_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100;
public static final long DEFAULT_SYNC_TABLE_RPC_TIMEOUT = 30000 * 60; // 30 mins
public static final long DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
public static final int DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level

/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
* and give some room for things in the middle
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.bouncycastle.crypto.digests.SHA256Digest;

/**
* Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk
* bundled SHA, since their digest can't be serialized/deserialized which is needed for
* PhoenixSyncTableTool for cross-region hash continuation.
*/
public class SHA256DigestUtil {

/**
* Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up
* to 128 bytes as buffer.
*/
public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128;

/**
* Encodes a SHA256Digest state to a byte array with length prefix for validation. Format: [4-byte
* integer length][encoded digest state bytes]
* @param digest The digest whose state should be encoded
* @return Byte array containing integer length prefix + encoded state
*/
public static byte[] encodeDigestState(SHA256Digest digest) {
byte[] encoded = digest.getEncodedState();
ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length);
buffer.putInt(encoded.length);
buffer.put(encoded);
return buffer.array();
}

/**
* Decodes a SHA256Digest state from a byte array.
* @param encodedState Byte array containing 4-byte integer length prefix + encoded state
* @return SHA256Digest restored to the saved state
* @throws IOException if state is invalid, corrupted
*/
public static SHA256Digest decodeDigestState(byte[] encodedState) throws IOException {
if (encodedState == null) {
throw new IllegalArgumentException("Invalid encoded digest state: encodedState is null");
}

DataInputStream dis = new DataInputStream(new ByteArrayInputStream(encodedState));
int stateLength = dis.readInt();
// Prevent malicious large allocations
if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
throw new IllegalArgumentException(
String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength,
MAX_SHA256_DIGEST_STATE_SIZE));
}

byte[] state = new byte[stateLength];
dis.readFully(state);
return new SHA256Digest(state);
}

/**
* Decodes a digest state and finalizes it to produce the SHA-256 checksum.
* @param encodedState Serialized digest state (format: [4-byte length][state bytes])
* @return 32-byte SHA-256 hash
* @throws IOException if state decoding fails
*/
public static byte[] finalizeDigestToChecksum(byte[] encodedState) throws IOException {
SHA256Digest digest = decodeDigestState(encodedState);
return finalizeDigestToChecksum(digest);
}

/**
* Finalizes a SHA256Digest to produce the final checksum.
* @param digest The digest to finalize
* @return 32-byte SHA-256 hash
*/
public static byte[] finalizeDigestToChecksum(SHA256Digest digest) {
byte[] hash = new byte[digest.getDigestSize()];
digest.doFinal(hash, 0);
return hash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,10 @@ public static boolean isIndexRebuild(Scan scan) {
return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null;
}

public static boolean isSyncTableChunkFormation(Scan scan) {
return scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION) != null;
}

public static int getClientVersion(Scan scan) {
int clientVersion = UNKNOWN_CLIENT_VERSION;
byte[] clientVersionBytes =
Expand Down
6 changes: 5 additions & 1 deletion phoenix-core-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,12 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Loading