Formulário de contato

Nome

E-mail *

Mensagem *

Este blog é um complemento do nosso canal no YouTube. Clique em @CanalQb para seguir e acompanhar nossos vídeos!
Imagem

Validação de Chaves Bitcoin SV em Python com PostgreSQL


@CanalQb no YouTube


@CanalQb

Como Usar a Testnet Chainphon e se Preparar para o Lançamento Oficial


Sempre crie uma frase de segurança única para jogos, testnets ou airdrops e evite usar sua carteira principal.



Como Conectar e Gerenciar Chaves Bitcoin SV com PostgreSQL e Python

Este artigo apresenta um guia completo para conectar-se a um banco de dados PostgreSQL usando Python, manipular intervalos de chaves privadas Bitcoin SV, e realizar operações de verificação paralela utilizando múltiplas threads. O código ilustrado exemplifica boas práticas em conexão de banco, manipulação de dados, uso de criptografia, e otimização para operações intensivas.

Configuração Inicial e Conexão ao Banco de Dados

O script começa limpando o terminal para facilitar a leitura das saídas, seguido pela definição da URL do banco de dados com os parâmetros necessários para a conexão segura:

DATABASE_URL = "postgresql://neondb_owner:.....us-east-1.aws.neon.tech/neondb?sslmode=require"

Para conectar, a função conectar_banco() utiliza a biblioteca pg8000 e um contexto SSL para garantir segurança na comunicação.

Manipulação das Tabelas e Dados no Banco

O script implementa funções para criar tabelas específicas dinamicamente, armazenar subintervalos de chaves privadas em hexadecimal, e selecionar linhas aleatórias para processamento incremental:

  • criar_tabela(table_name): cria a tabela, se não existir, e adiciona índice único para garantir integridade.
  • inserir_partes(...): insere intervalos de subranges com tentativas de reprocessamento em caso de falhas.
  • escolher_linha_aleatoria(table_name): retorna uma linha aleatória para avaliação de progresso, respeitando intervalos de tempo para evitar duplicações.

Geração e Validação de Chaves Bitcoin SV

Utilizando a curva elíptica SECP256k1 e funções de hashing padrão (sha256 e ripemd160), o código converte uma chave privada para uma chave pública comprimida, depois gera o endereço Bitcoin SV correspondente:

  • private_key_to_public_key_compressed(private_key): cria a chave pública comprimida.
  • public_key_to_address(public_key): gera o endereço Bitcoin SV codificado em Base58Check.
  • is_valid_private_key(private_key_int): verifica se a chave gera um endereço que começa com '1', indicando validade no padrão Bitcoin.

Processamento Paralelo para Alta Performance

O script divide o intervalo de chaves em partes iguais para processamento em paralelo usando concurrent.futures.ThreadPoolExecutor. Essa abordagem melhora o desempenho significativamente para verificação de grandes volumes de chaves.

  • num_threads: Número de threads configuráveis para aproveitar o poder do processador.
  • Divisão do intervalo para cada thread executar a função verificar_chaves_no_intervalo().

Exemplo de Uso e Resultados

Após executar a função principal, o usuário seleciona o puzzle (identificador) que deseja trabalhar. O script exibe informações sobre subranges e progresso e então inicia a análise das chaves, mostrando o total de chaves válidas e inválidas no final.

Esse fluxo é ideal para quem trabalha com segurança em blockchain, desenvolve ferramentas de análise de chaves, ou gerencia bancos de dados de puzzles criptográficos.

Considerações Importantes

  • Este código serve para fins educacionais e de desenvolvimento, não sendo recomendado o uso em produção sem adaptações de segurança adequadas.
  • É fundamental entender profundamente o funcionamento de chaves privadas antes de utilizar este script para manipulação real de carteiras.
  • Para projetos relacionados a investimentos ou gerenciamento de ativos digitais, é imprescindível realizar análises próprias e responsáveis, evitando riscos financeiros desnecessários.

Referências e Ferramentas

Segue o Script   

import pg8000
import urllib.parse
import random
import time
import os
import datetime
import gc
import hashlib
from ecdsa import SECP256k1
import sys
import concurrent.futures

# Limpa o terminal para uma visualização mais limpa
os.system('cls' if os.name == 'nt' else 'clear')

# URL do banco de dados
DATABASE_URL = "postgresql://neondb_owner:.....us-east-1.aws.neon.tech/neondb?sslmode=require"
url = urllib.parse.urlparse(DATABASE_URL)

# Função para conectar ao banco de dados
def conectar_banco():
    try:
        return pg8000.connect(
            user=url.username,
            password=url.password,
            host=url.hostname,
            port=5432,
            database=url.path[1:],
            ssl_context=True
        )
    except Exception as e:
        print(f"Erro ao conectar ao banco de dados: {e}")
        raise

# Função para buscar as identificações dos puzzles
def buscar_identificacoes():
    try:
        with conectar_banco() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT identificacao FROM lista_puzzle;")
            return [puzzle[0] for puzzle in cursor.fetchall()]
    except Exception as e:
        print(f"Erro ao buscar identificações: {e}")
        return []

# Função para buscar o puzzle selecionado
def buscar_puzzle(user_choice):
    try:
        with conectar_banco() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM lista_puzzle WHERE identificacao = %s;", (user_choice,))
            return cursor.fetchone()
    except Exception as e:
        print(f"Erro ao buscar puzzle: {e}")
        return None

# Função para criar a tabela no banco de dados
def criar_tabela(table_name):
    try:
        with conectar_banco() as conn:
            cursor = conn.cursor()
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    id SERIAL PRIMARY KEY,
                    inicio_subrange TEXT,
                    final_subrange TEXT,
                    progresso_subrange TEXT,
                    datahora TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)
            cursor.execute(f"""
                CREATE UNIQUE INDEX IF NOT EXISTS unique_inicio_subrange ON {table_name} (inicio_subrange);
            """)
            conn.commit()
    except Exception as e:
        print(f"Erro ao criar tabela {table_name}: {e}")

# Função para inserir partes na tabela
def inserir_partes(table_name, indices_aleatorios, inicio_intervalo, tamanho_parte, num_partes):
    tentativas = 0
    sucesso = False
    while tentativas < 3 and not sucesso:
        parte_inicio = inicio_intervalo + (indices_aleatorios[0] * tamanho_parte)
        parte_fim = parte_inicio + tamanho_parte
        parte_inicio_hex = hex(parte_inicio)
        parte_fim_hex = hex(parte_fim)
        
        try:
            with conectar_banco() as conn:
                cursor = conn.cursor()
                cursor.execute(f"SELECT to_regclass('{table_name}')")
                if cursor.fetchone()[0] is None:
                    criar_tabela(table_name)
                
                cursor.execute(f"SELECT * FROM {table_name} WHERE inicio_subrange = %s;", (parte_inicio_hex,))
                if cursor.fetchone() is None:
                    datahora_atual = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
                    cursor.execute(f"""
                        INSERT INTO {table_name} (inicio_subrange, final_subrange, progresso_subrange, datahora)
                        VALUES (%s, %s, %s, %s);
                    """, (parte_inicio_hex, parte_fim_hex, None, datahora_atual))
                    sucesso = True
        except Exception as e:
            print(f"Erro ao inserir partes na tabela {table_name}: {e}")
            tentativas += 1
            indices_aleatorios = random.sample(range(num_partes), 1)

# Função para escolher uma linha aleatória da tabela
def escolher_linha_aleatoria(table_name):
    try:
        with conectar_banco() as conn:
            cursor = conn.cursor()
            cursor.execute(f"SELECT inicio_subrange, final_subrange, progresso_subrange, datahora FROM {table_name} ORDER BY RANDOM() LIMIT 1;")
            return cursor.fetchone()
    except Exception as e:
        print(f"Erro ao escolher linha aleatória: {e}")
        return None

# Função para preencher variáveis com base na linha aleatória escolhida
def procedimento_preencher_variaveis(table_name):
    tentativas = 0
    while tentativas < 20:
        random_row = escolher_linha_aleatoria(table_name)
        if not random_row:
            return None
        
        datahora_db = random_row[3]
        if not datahora_db:
            return None

        if isinstance(datahora_db, str):
            datahora_db = datetime.datetime.strptime(datahora_db, '%Y-%m-%d %H:%M:%S')

        if not isinstance(datahora_db, datetime.datetime):
            return None

        tempo_atual = datetime.datetime.utcnow()
        delta_tempo = tempo_atual - datahora_db
        segundos_diff = delta_tempo.total_seconds()

        if segundos_diff >= 60:
            return random_row

        tentativas += 1
        time.sleep(1)
    
    return None

# Função para gerar a chave pública a partir de uma chave privada (comprimida)
def private_key_to_public_key_compressed(private_key):
    #print(f'0x{hex(private_key)[2:].zfill(64)}')
    public_key_point = SECP256k1.generator * private_key
    if public_key_point.y() % 2 == 0:
        public_key_bytes = b'\x02' + public_key_point.x().to_bytes(32, byteorder='big')
    else:
        public_key_bytes = b'\x03' + public_key_point.x().to_bytes(32, byteorder='big')
    return public_key_bytes

# Função para gerar o endereço Bitcoin SV (com base na chave pública comprimida)
def public_key_to_address(public_key):
    sha256_hash = hashlib.sha256(public_key).digest()
    ripemd160_hash = hashlib.new("ripemd160", sha256_hash).digest()
    prefixed = b'\x00' + ripemd160_hash
    checksum = hashlib.sha256(hashlib.sha256(prefixed).digest()).digest()[:4]
    address = prefixed + checksum
    alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
    num = int.from_bytes(address, 'big')
    encoded_address = ''
    while num > 0:
        num, mod = divmod(num, 58)
        encoded_address = alphabet[mod] + encoded_address
    n = 0
    for byte in address:
        if byte == 0:
            n += 1
        else:
            break
    return alphabet[0] * n + encoded_address

# Função para verificar se uma chave privada é válida
def is_valid_private_key(private_key_int):
    if private_key_int <= 0 or private_key_int >= SECP256k1.order:
        return False
    public_key = private_key_to_public_key_compressed(private_key_int)
    address = public_key_to_address(public_key)
    return address[0] == '1'

# Função para verificar chaves em um intervalo
def verificar_chaves_no_intervalo(inicio, fim):
    valid_keys_local = 0
    invalid_keys_local = 0
    
    for private_key_int in range(inicio, fim + 1):
        if is_valid_private_key(private_key_int):
            valid_keys_local += 1
        else:
            invalid_keys_local += 1
    
    return valid_keys_local, invalid_keys_local

# Função principal do script
def main():
    identificacoes = buscar_identificacoes()
    colunas = 12
    linhas = len(identificacoes) // colunas + (1 if len(identificacoes) % colunas != 0 else 0)
    
    for i in range(linhas):
        linha = [str(identificacoes[i + j * linhas]) for j in range(colunas) if i + j * linhas < len(identificacoes)]
        print(" |\t ".join(linha))
    
    user_choice = input("\nQual puzzle deseja executar? (Escolha o identificador): ")
    selected_puzzle = buscar_puzzle(user_choice)
    
    if selected_puzzle:
        identificacao, inicio_range, fim_range, carteira, ativo = selected_puzzle
    else:
        print("Puzzle não encontrado!")
        return

    inicio_intervalo = int(f'0x{inicio_range}', 16)
    fim_intervalo = int(f'0x{fim_range}', 16)
    diferenca = fim_intervalo - inicio_intervalo
    num_partes = 1000000000
    tamanho_parte = diferenca // num_partes
    table_name = f"puzzle{identificacao}"

    criar_tabela(table_name)
    indices_aleatorios = random.sample(range(num_partes), 1)
    inserir_partes(table_name, indices_aleatorios, inicio_intervalo, tamanho_parte, num_partes)

    random_row = procedimento_preencher_variaveis(table_name)
    gc.collect()

    base_inicial = inicio_intervalo
    base_final = fim_intervalo

    if random_row:
        print("\nDados da linha com diferença de tempo maior que 1 minuto:")
        print(f"Início do Subrange: {random_row[0]}")
        print(f"Final do Subrange: {random_row[1]}")
        print(f"Progresso Subrange: {random_row[2]}")
        print(f"Data e Hora do Registro: {random_row[3]}")
        base_inicial = int(random_row[0], 16) if random_row[0] else base_inicial
        base_final = int(random_row[1], 16) if random_row[1] else base_final
    else:
        print("Nenhuma linha encontrada com a diferença de tempo requerida.")

    print(f"Base Inicial: {hex(base_inicial)} - Base Final: {hex(base_final)}")

    valid_keys = 0
    invalid_keys = 0
    start_time = time.time()

    total_keys = base_final - base_inicial + 1
    chaves_contar = 100000  # A cada 100.000 chaves, imprime o progresso

    # Aqui, dividimos o trabalho em múltiplos threads
    num_threads = 8  # Por exemplo, 8 threads (ajuste conforme seu sistema)
    step = (base_final - base_inicial) // num_threads

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for i in range(num_threads):
            start = base_inicial + i * step
            end = base_inicial + (i + 1) * step - 1 if i < num_threads - 1 else base_final
            futures.append(executor.submit(verificar_chaves_no_intervalo, start, end))

        for future in concurrent.futures.as_completed(futures):
            valid_keys_local, invalid_keys_local = future.result()
            valid_keys += valid_keys_local
            invalid_keys += invalid_keys_local

    # Exibe os resultados finais
    print(f"\nTotal de chaves válidas: {valid_keys}")
    print(f"Total de chaves inválidas: {invalid_keys}")

# Inicia o processo
if __name__ == "__main__":
    main()
 

Comentários