I would like to use GCP KMS to sign an Ethereum transaction and recover the sender address. However, weirdly I'm not getting the same signature when I provide the exact same transaction. I'm using a HSM key that uses the ec-sign-secp256k1-sha256
algorithm.
Following is the code I'm using to sign the transaction and recover the sender address:
from google.cloud import kms
from google.oauth2 import service_account
from crcmod.predefined import mkPredefinedCrcFun # type: ignore
from dotenv import load_dotenv
from json import loads
from web3 import Web3
from rlp import encode
from eth_utils import (
add_0x_prefix,
keccak,
remove_0x_prefix,
to_bytes,
to_int,
to_checksum_address,
int_to_big_endian,
big_endian_to_int,
)
from eth_keys import keys
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from pyasn1.type import univ, namedtype
from pyasn1.codec.der.decoder import decode
load_dotenv()
GOOGLE_APPLICATION_CREDENTIALS = loads(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
CREDENTIALS = service_account.Credentials.from_service_account_info(
GOOGLE_APPLICATION_CREDENTIALS
)
class EcdsaSig(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType("r", univ.Integer()),
namedtype.NamedType("s", univ.Integer()),
)
def decode_ecdsa_sig(self, asn_string_buffer):
ecdsa_sig, _ = decode(asn_string_buffer, asn1Spec=EcdsaSig())
return ecdsa_sig["r"], ecdsa_sig["s"]
class HSM:
client: kms.KeyManagementServiceClient
key_version_name: str
address: str
def __init__(
self,
project_id: str,
location_id: str,
key_ring_id: str,
key_id: str,
version_id: str,
) -> None:
self.client = kms.KeyManagementServiceClient(credentials=CREDENTIALS)
self.key_version_name = self.client.crypto_key_version_path(
project_id, location_id, key_ring_id, key_id, version_id
)
self.address = ""
def sign(self, transaction: dict) -> None:
message_bytes = to_bytes(hexstr=self.__rlp_encode_transaction(transaction))
hash_message = keccak(primitive=message_bytes)
signature, r, s = self.__request_kms_signature(hash_message)
self.__derive_ethereum_address()
_, v = self.__determine_correct_v(hash_message, r, s)
sig = self.__join_signature(r, s, v)
print(sig)
return sig
def derive_address(self) -> str:
self.__derive_ethereum_address()
return self.address
def __request_kms_signature(self, hashed_message: bytes) -> tuple[bytes, int, int]:
digest = {"sha256": hashed_message}
digest_crc32c = self.__crc32c(hashed_message)
sign_request = kms.AsymmetricSignRequest(
name=self.key_version_name, digest=digest, digest_crc32c=digest_crc32c
)
sign_response = self.client.asymmetric_sign(request=sign_request)
print(len(sign_response.signature.hex()))
r, s = self.__find_ethereum_sig(sign_response.signature)
return sign_response.signature, r, s
def __crc32c(self, data: bytes) -> int:
crc32c_fun = mkPredefinedCrcFun("crc-32c")
return crc32c_fun(data)
def __pem_to_der(self, pem_key: str) -> bytes:
# Deserialize PEM-encoded key from the provided string
private_key = serialization.load_pem_public_key(
pem_key.encode(), # Convert PEM string to bytes
backend=default_backend(),
)
# Serialize the key to DER format
der_key = private_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
return der_key
def __derive_ethereum_address(self) -> None:
if not self.address:
public_key = self.client.get_public_key(
request={"name": self.key_version_name}
)
der_public_key = self.__pem_to_der(public_key.pem)
eth_address = keccak(primitive=der_public_key)[-20:]
self.address = to_checksum_address(eth_address)
def __find_ethereum_sig(self, signature: bytes) -> tuple[int, int]:
r, s = EcdsaSig().decode_ecdsa_sig(signature)
secp256k1_N = int(
"fffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141", 16
)
s = secp256k1_N - s if s > secp256k1_N / 2 else s
return int(r), int(s)
def __determine_correct_v(self, message: bytes, r: int, s: int) -> tuple[str, int]:
v = 0
recoverd_address = self.__recover_pub_key_from_sig(message, r, s, v)
print(recoverd_address)
print(self.address)
if recoverd_address.lower() != self.address.lower():
# if the pub key for v = 27 does not match
# it has to be v = 28
v = 1
recoverd_address = self.__recover_pub_key_from_sig(message, r, s, v)
return recoverd_address, v + 27
def __recover_pub_key_from_sig(self, message: bytes, r: int, s: int, v: int) -> str:
signature = keys.Signature(vrs=(v, r, s))
public_key = signature.recover_public_key_from_msg(message)
address = public_key.to_checksum_address()
return address
def __join_signature(self, r: int, s: int, v: int) -> str:
# Ensure r, s, and v are integers
r, s, v = map(to_bytes, (r, s, v))
signature_hex = int_to_big_endian(big_endian_to_int(r + s + v)).hex()
return signature_hex
def __rlp_encode_transaction(self, tx: dict) -> str:
encoded_params = encode(
[
tx["chainId"],
tx["nonce"],
tx["maxPriorityFeePerGas"],
tx["maxFeePerGas"],
tx["gas"],
to_bytes(hexstr=tx["to"]),
tx["value"],
to_bytes(hexstr=tx["data"]),
[],
]
)
return self.__add_transaction_type(encoded_params.hex())
def __add_transaction_type(self, payload: str) -> str:
return f"02{remove_0x_prefix(payload)}"
w3 = Web3(
Web3.HTTPProvider(
"RPC_URL"
)
)
hsm = HSM(
"innovation-sandbox-1",
"southamerica-east1",
"my-key-ring",
"solidity",
"1",
)
# Example Ethereum transaction data
nonce = w3.eth.get_transaction_count("ONE_ADDRESS")
gas_price = w3.to_wei("20", "gwei")
gas_limit = 21000
to_address = "ANOTHER_ADDRESS"
value = w3.to_wei("1", "ether")
# Create the Ethereum transaction
transaction = {
"chainId": w3.eth.chain_id,
"nonce": nonce,
"maxPriorityFeePerGas": w3.eth.max_priority_fee,
"maxFeePerGas": gas_price,
"gas": gas_limit,
"to": to_address,
"value": value,
"data": "0x",
}
hsm.sign(transaction)
I would really appreciate if someone could help me with this. Thank you for your time.