【Python】pubnub と WebSocket で更にリアルタイムで情報を取得する

2018年3月22日Python,開発

おはようございます。

えーと、SQLiteにデータ登録なんかしようと思ってたんですが、
どう実装すればいいか分からず別の方法で実装した、pubnub で ティッカー情報が更新される度に画面の情報も更新する方法が分かりましたのでやってみました。

プログラムは前回のものを流用します。

【Python】bitflyer の API を使って注文を送信する

スポンサーリンク

画面の修正

Main.html

ティッカー情報の配信を開始するボタンと停止するボタンを追加しました。

<div style="clear:both; padding-top:10px;">
   <div class="entry_title">
      <div class="pull_left">ティッカー情報</div>
      <div class="pull_right">
         <input type="button" value="更新開始" />
         <input type="button" value="更新停止" />
      </div>
   </div>
   <table id="tickerTable">
      <tr class="header">
         <th style="width:5%">種別</th>
         <th style="width:10%">時刻</th>
         <th style="width:5%">ID</th>
         <th style="width:5%">売値</th>
         <th style="width:5%">買値</th>
         <th style="width:10%">売り数量</th>
         <th style="width:10%">買い数量</th>
         <th style="width:10%">売り注文総数</th>
         <th style="width:10%">買い注文総数</th>
         <th style="width:10%">最終取引価格</th>
         <th style="width:10%">出来高</th>
         <th style="width:10%">価格単位出来高</th>
      </tr>
   </table>
</div>

プログラムの修正

APIの修正

BfApi.py

インポートするクラス

import hashlib
import hmac
import json
import logging
import requests
import time
import urllib


from common.Constants import Constants

from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNStatusCategory
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub import SubscribeListener
from pubnub.pubnub_tornado import PubNubTornado

追加するメソッド、内部クラス

def start_pub_nub_ticker(self, cl, channels):
    """
    ティッカー情報の配信を開始
    :param cl: web_socket client
    :param channels: 配信するチャンネル
    :return:
    """

    self.listener = self.MySubscriberCallback(cl)
    self.pub_nub.add_listener(self.listener)
    self.pub_nub.subscribe().channels(channels).execute()

def stop_pub_nub_ticker(self, channels):
    """
    ティッカー情報の配信を停止
    :param channels: 停止するチャンネル
    :return:
    """
    self.pub_nub.unsubscribe().channels(channels).execute()
    self.pub_nub.remove_listener(self.listener)

class MySubscriberCallback(SubscribeCallback):
    """
    Pubnub登録のコールバッククラス
    """

    def __init__(self, clent):
        self.cl = clent

    def presence(self, pubnub, presence):
        pass  # handle incoming presence data

    def status(self, pubnub, status):
        if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
            pass  # This event happens when radio / connectivity is lost

        elif status.category == PNStatusCategory.PNConnectedCategory:
            # Connect event. You can do stuff like publish, and know you'll get it.
            # Or just use the connected event to confirm you are subscribed for
            # UI / internal notifications, etc
            pass
        elif status.category == PNStatusCategory.PNReconnectedCategory:
            pass
            # Happens as part of our regular operation. This event happens when
            # radio / connectivity is lost, then regained.
        elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
            pass
            # Handle message decryption error. Probably client configured to
            # encrypt messages and on live data feed it received plain text.

    def message(self, pubnub, message):
        """
        登録したチャンネルからメッセージを受信した際の処理
        :param pubnub:
        :param message:
        :return:
        """
        # WEBソケットを利用してクライアントに配信
        for c in self.cl:
            c.write_message(message.message)

メインクラスのグローバル変数を追加

BfTool.py

    cl = []
    api_ticker = None

WEBソケットの修正

BfTool.py

ひとまずは、既存のティッカー情報更新用メソッドはコメントアウトし、
新しい仕組みを追加しました。

class SendWebSocket(WebSocketHandler):
    """
    WEBソケット通信
    1秒毎にティッカー情報を画面に配信します
    """

    def open(self):
        logging.info('SendWebSocket [open] IP :' + self.request.remote_ip)
        self.ioloop = tornado.ioloop.IOLoop.instance()
        # self.send_ticker()
        if self not in cl:
            cl.append(self)

    def on_message(self, message):
        logging.info("WebSocketHandler [on_message]")
        for client in cl:
            client.write_message(message)

    def on_close(self):
        logging.info("SendWebSocket [on_closed]")
        if self in cl:
            cl.remove(self)

    def check_origin(self, origin):
        logging.info("SendWebSocket [check_origin]")

        return True

    def send_ticker(self):

        self.ioloop.add_timeout(time.time() + 2, self.send_ticker)
        if self.ws_connection:
            api = BfApi()
            data = api.call_pub_nub('lightning_ticker_FX_BTC_JPY')
            message = json.dumps(data)
            self.write_message(message)

ティッカー情報配信の開始と停止メソッド追加

BfTool.py

画面から開始、停止ができるようにURLメソッドを追加しました。

class StartTicker(RequestHandler):
    """
    ティッカー情報の更新を開始
    """

    def initialize(self):
        logging.info("StartTicker [initialize]")

    def post(self):
        logging.info("StartTicker [post]")
        global api_ticker
        api_ticker = BfApi()
        api_ticker.start_pub_nub_ticker(cl, ['lightning_ticker_FX_BTC_JPY'])


class StopTicker(RequestHandler):
    """
    ティッカー情報の更新を停止
    """

    def initialize(self):
        logging.info("StopTicker [initialize]")

    def post(self):
        logging.info("StopTicker [post]")
        global api_ticker
        if api_ticker:
            api_ticker.stop_pub_nub_ticker('lightning_ticker_FX_BTC_JPY')

URLマッピングの追加

BfTool.py

    app = tornado.web.Application([
        (r"/", MainHandler),
        (r"/ws", SendWebSocket),
        (r"/balance", GetBalanceHandler),
        (r"/execution", GetExecutionHandler),
        (r"/childOrder", GetChildOrderHandler),
        (r"/sendOrder", SendChildOrderHandler),
        (r"/cancelOrder", CancelChildOrderHandler),
        (r"/startTicker", StartTicker),
        (r"/stopTicker", StopTicker)
        ],
        template_path=os.path.join(os.getcwd(), "templates"),
        static_path=os.path.join(os.getcwd(), "static"),
        js_path=os.path.join(os.getcwd(), "js"),
    )

画面処理の追加

script.js

/**
 * ティッカーの更新を開始
 */
function startTicker() {

   $.ajax({
      url: "http://localhost:8080/startTicker",
      type: "POST",
      success: function(jsonResponse) {
          console.log(jsonResponse);
      },
      error: function() {
      }
   });
}

/**
 * ティッカーの更新を停止
 */
function stopTicker() {

   $.ajax({
      url: "http://localhost:8080/stopTicker",
      type: "POST",
      success: function(jsonResponse) {
          console.log(jsonResponse);
      },
      error: function() {
      }
   });
}

起動してみる

ティッカー情報のリアルタイム配信

スクショじゃ分からないですが、すごい速さでティッカー情報が更新されていきます。
この速度では10件表示していてもすぐ流れて行ってしまうので、本来であれば最新のみ表示するとかで十分ですね。

あとは、自動取引のトリガーとすることが多いようです。

そのうち自動取引の条件なんかを画面で設定し、自動取引の開始・停止が出来るようにしたいですね。

スポンサーリンク


関連するコンテンツ

2018年3月22日Python,開発Python,プログラミング

Posted by doradora