how to make live stream function works with a generated variable each time?

34 views Asked by At

I want to run a binance live stream function that get a generated symbol from another function each time. 1- top gainer function: scan the market and get top price gainer coin each certain time. 2- live stream run function: get the symbol from the first function then start websocket to pull data each time with a new symbol comes from top gainer function.

here is the code:

<

import json,time, asyncio,websockets
import pandas as pd
from binance.client import Client
import numpy as np

client = Client(api_key=pkey, api_secret=skey, tld='com', testnet=True)

interval = '1m'

data=pd.DataFrame(columns=['Date','Coin','Open', 'High', 'Low', 'Close', 'Volume', 'no_trades','Quote_Volume','Complete'])

async def top_3gainer_coins():  
    try:
        print("Updating top gainer coins...")
        time.sleep(90)
        exchange_info = client.get_exchange_info()
        spot_symbols = [
            symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['isSpotTradingAllowed']
            and symbol['quoteAsset'] == 'USDT'
            and symbol['status'] == 'TRADING'  # Ensure the symbol is actively trading
            ]
        all_data = []
        for symbol in spot_symbols:
            try:
                data = (pd.DataFrame(client.get_klines(symbol=symbol, interval='5m', limit=12))).copy()
                data = data.iloc[:, [0, 4, 5, 8]]  # Keep only columns with timestamp, open, close, and volume
                data.columns = ['Date', 'Close', 'Volume', 'NoTrades']
                data['Date'] = pd.to_datetime(data['Date'], unit='ms')
                for column in data.columns[1:]:
                    data[column] = pd.to_numeric(data[column])
                data['pricegain'] = (data.Close.pct_change() * 100).fillna(0)
                data['VGain'] = (data.Volume.pct_change() * 100).fillna(0)
                data['TPS'] = (data.NoTrades / 60).fillna(0)
                data.insert(1, 'Symbol', symbol)
                data = data.iloc[-2:]
                data = data.replace([np.inf, -np.inf], np.nan)  # Replace inf with NaN
                data = data.reset_index(drop=True)  # Reset the index for each symbol's data
                all_data.append(data)
            except Exception as e:
                print(f"Error fetching data for symbol {symbol}: {e}")
                continue
        if all_data:
            all_data = pd.concat(all_data)
            all_data = all_data.sort_values(by='pricegain', ascending=False)
            all_data.reset_index(drop=True, inplace=True)
            all_data.index += 1

            top_gainer = all_data.head(1)      # Select the top 3 gainers            
            top_gainer_coins = top_gainer['Symbol'].tolist()   # Get the symbols of the top 3 gainers
            print(top_gainer_coins)
            print(f"Top 3 gainer coins\n{top_gainer.to_string()}")
            
    except Exception as e:
        print(f"Error: {e}") 
    return top_gainer_coins

def on_open(ws):
    #print(f'Opened connection for {ws.symbol}')
    pass

def on_close(ws):
    print(f'Closed connection for {ws.symbol}')

async def on_message(ws, msg):
    try:
        msg = json.loads(msg)
        start_time = pd.to_datetime(msg['k']['t'], unit='ms')
        symbol = msg['s']
        open_price = float(msg['k']['o'])
        high = float(msg['k']['h'])
        low = float(msg['k']['l'])
        close = float(msg['k']['c'])
        volume = float(msg['k']['v'])
        no_trades = float(msg['k']['n'])
        quote_volume = float(msg['k']['q'])
        complete = msg['k']['x']

        if complete:
            data.loc[len(data)] = [start_time, symbol, open_price, high, low, close, volume, no_trades, quote_volume, complete]
            print(data)
    except Exception as e:
        print(f"Error processing message for symbol {symbol}: {e}")
async def run(symbol):
    print('system starts')
    try:  
        symbol= symbol.lower()
        print(f'Start working on coin: {symbol}')
        socket = f"wss://stream.binance.com:9443/ws/{symbol}@kline_{interval}"
        async with websockets.connect(socket) as ws:
            while True:
                response = await ws.recv()
                on_message(ws, response)
                await asyncio.sleep(1)
    except Exception as e:
        print(f'error on run : {e}')
        pass
async def main():
    while True:
        try:
            symbols =await top_3gainer_coins()
            if symbols:
                # Create a list of tasks for the run function with the symbols from top gainers
                run_tasks = [asyncio.create_task(run(symbol)) for symbol in symbols]
                await asyncio.gather(*run_tasks)  # Wait for all run tasks to complete
        except Exception as e:
            print(f'error: {e}')

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f'error: {e}')
0

There are 0 answers