Jump to content
  • 0

Subscription through API


Guest coga

Question

I am trying to subscribe to my first streaming data. I basically copy pasted code from the IG Labs forum (what's the difference between here and there?), added my own api key, and it works. Well, almost. It connected well but I can't subscribe. What am I doing wrong?
 
Subscription returns error 17 Data Adapter not found
 
This is the subscription code:
    fields = ["offer"]
    fields = [field.upper() for field in fields]
    print(fields)
    # Making a new Subscription in MERGE mode
    subscription = Subscription(
        mode="MERGE",
        # items=["EURJPY", "USDJPY"],
        items=["CS.D.USDJPY.CFD.IP"],
        # items=["CS.D.USDJPY.CFD.IP", "CS.D.EURJPY.CFD.IP"],
        fields=fields,
        adapter="POSITIONS")
 
 
and this is the rest of the code (mostly copy pasted from the IG LAB's forum) 
# when this site used as main (for debugging), we need to add its parent
# folder to the PYTHONPATH, so we could import correctly.
if __name__ == "__main__":
    import os
    import sys
    import inspect
    currentdir = os.path.dirname(os.path.abspath(
        inspect.getfile(inspect.currentframe())))
    parentdir = os.path.dirname(currentdir)
    sys.path.insert(0, parentdir)


import utils.log_utils as log_utils
import sys
import logging as lg
import threading
import time
import traceback
import requests
import configs.load_config as load_config

from urllib.request import urlopen as _urlopen
from urllib.parse import (urlparse as parse_url, urljoin, urlencode)


def _url_encode(params):
    return urlencode(params).encode("utf-8")


def _iteritems(d):
    return iter(d.items())


def wait_for_input():
    input("{0:-^80}\n".format("HIT CR TO UNSUBSCRIBE AND DISCONNECT FROM LIGHTSTREAMER"))


CONNECTION_URL_PATH = "lightstreamer/create_session.txt"
BIND_URL_PATH = "lightstreamer/bind_session.txt"
CONTROL_URL_PATH = "lightstreamer/control.txt"
# Request parameter to create and activate a new Table.
OP_ADD = "add"
# Request parameter to delete a previously created Table.
OP_DELETE = "delete"
# Request parameter to force closure of an existing session.
OP_DESTROY = "destroy"
# List of possible server responses
PROBE_CMD = "PROBE"
END_CMD = "END"
LOOP_CMD = "LOOP"
ERROR_CMD = "ERROR"
SYNC_ERROR_CMD = "SYNC ERROR"
OK_CMD = "OK"


class Subscription(object):
    """Represents a Subscription to be submitted to a Lightstreamer Server."""

    def __init__(self, mode, items, fields, adapter=""):
        class_hierarchy = __name__ + "Subscription"
        self.logger = lg.getLogger(class_hierarchy)
        self.item_names = items
        self._items_map = {}
        self.field_names = fields
        self.adapter = adapter
        self.mode = mode
        self.snapshot = "true"
        self._listeners = []

    def _decode(self, value, last):
        """Decode the field value according to
        Lightstreamer Text Protocol specifications.
        """
        if value == "$":
            return u''
        elif value == "#":
            return None
        elif not value:
            return last
        elif value[0] in "#$":
            value = value[1:]

        return value

    def addlistener(self, listener):
        self._listeners.append(listener)

    def notifyupdate(self, item_line):
        """Invoked by LSClient each time Lightstreamer Server pushes
        a new item event.
        """
        # Tokenize the item line as sent by Lightstreamer
        toks = item_line.rstrip('\r\n').split('|')
        undecoded_item = dict(list(zip(self.field_names, toks[1:])))

        # Retrieve the previous item stored into the map, if present.
        # Otherwise create a new empty dict.
        item_pos = int(toks[0])
        curr_item = self._items_map.get(item_pos, {})
        # Update the map with new values, merging with the
        # previous ones if any.
        self._items_map[item_pos] = dict([
            (k, self._decode(v, curr_item.get(k))) for k, v
            in list(undecoded_item.items())
        ])
        # Make an item info as a new event to be passed to listeners
        item_info = {
            'pos': item_pos,
            'name': self.item_names[item_pos - 1],
            'values': self._items_map[item_pos]
        }

        # Update each registered listener with new event
        for on_item_update in self._listeners:
            on_item_update(item_info)


class LSClient(object):
    """Manages the communication with Lightstreamer Server"""

    def __init__(self, base_url, adapter_set="", user="", password=""):
        class_hierarchy = __name__ + "LSClient"
        self.logger = lg.getLogger(class_hierarchy)
        self._base_url = parse_url(base_url)
        self._adapter_set = adapter_set
        self._user = user
        self._password = password
        self._session = {}
        self._subscriptions = {}
        self._current_subscription_key = 0
        self._stream_connection = None
        self._stream_connection_thread = None
        self._bind_counter = 0

    def _encode_params(self, params):
        """Encode the parameter for HTTP POST submissions, but
        only for non empty values..."""
        return _url_encode(
            dict([(k, v) for (k, v) in _iteritems(params) if v])
        )

    def _call(self, base_url, url, params):
        """Open a network connection and performs HTTP Post
        with provided params.
        """
        # Combines the "base_url" with the
        # required "url" to be used for the specific request.
        url = urljoin(base_url.geturl(), url)
        body = self._encode_params(params)
        self.logger.debug("Making a request to <%s> with body <%s>", url, body)
        return _urlopen(url, data=body)

    def _set_control_link_url(self, custom_address=None):
        """Set the address to use for the Control Connection
        in such cases where Lightstreamer is behind a Load Balancer.
        """
        if custom_address is None:
            self._control_url = self._base_url
        else:
            parsed_custom_address = parse_url("//" + custom_address)
            self._control_url = parsed_custom_address._replace(
                scheme=self._base_url[0]
            )

    def _control(self, params):
        """Create a Control Connection to send control commands
        that manage the content of Stream Connection.
        """
        max_reads=100
        params["LS_session"] = self._session["SessionId"]
        response = self._call(self._control_url, CONTROL_URL_PATH, params)
        decoded_response = ""
        for i in range(max_reads):
            response_line = response.readline().decode("utf-8").rstrip()
            if not response_line:
                break
            else:
                decoded_response += response_line + "\n"
        self.logger.debug("Server response: <%s>", decoded_response)
        return decoded_response

    def _read_from_stream(self):
        """Read a single line of content of the Stream Connection."""
        line = self._stream_connection.readline().decode("utf-8").rstrip()
        return line

    def connect(self):
        """Establish a connection to Lightstreamer Server to create a new
        session.
        """
        self.logger.debug("Opening a new session to <%s>",
                          self._base_url.geturl())
        self._stream_connection = self._call(
            self._base_url,
            CONNECTION_URL_PATH,
            {
                "LS_op2": 'create',
                "LS_cid": 'mgQkwtwdysogQz2BJ4Ji kOj2Bg',
                "LS_adapter_set": self._adapter_set,
                "LS_user": self._user,
                "LS_password": self._password}
        )
        stream_line = self._read_from_stream()
        self._handle_stream(stream_line)

    def bind(self):
        """Replace a completely consumed connection in listening for an active
        Session.
        """
        self.logger.debug("Binding to <%s>", self._control_url.geturl())
        self._stream_connection = self._call(
            self._control_url,
            BIND_URL_PATH,
            {
                "LS_session": self._session["SessionId"]
            }
        )

        self._bind_counter += 1
        stream_line = self._read_from_stream()
        self._handle_stream(stream_line)
        self.logger.info("Bound to <%s>", self._control_url.geturl())

    def _handle_stream(self, stream_line):
        if stream_line == OK_CMD:
            self.logger.info("Successfully connected to <%s>",
                             self._base_url.geturl())
            self.logger.debug("Starting to handling real-time stream")
            # Parsing session inkion
            while 1:
                next_stream_line = self._read_from_stream()
                if next_stream_line:
                    session_key, session_value = next_stream_line.split(":", 1)
                    self._session[session_key] = session_value
                else:
                    break

            # Setup of the control link url
            self._set_control_link_url(self._session.get("ControlAddress"))

            # Start a new thread to handle real time updates sent
            # by Lightstreamer Server on the stream connection.
            self._stream_connection_thread = threading.Thread(
                name="StreamThread-{0}".format(self._bind_counter),
                target=self._receive
            )
            self._stream_connection_thread.setDaemon(True)
            self._stream_connection_thread.start()
            self.logger.info("Started handling of real-time stream")
        else:
            lines = self._stream_connection.readlines()
            lines.insert(0, stream_line)
            self.logger.error("Server response error: \n%s",
                              " ".join((str(x) for x in lines)))
            raise IOError()

    def _join(self):
        """Await the natural StreamThread termination."""
        if self._stream_connection_thread:
            self.logger.debug("Waiting for thread to terminate")
            self._stream_connection_thread.join()
            self._stream_connection_thread = None
            self.logger.debug("Thread terminated")

    def disconnect(self):
        """Request to close the session previously opened with the connect()
        invocation.
        """
        if self._stream_connection is not None:
            self.logger.debug("Closing session to <%s>",
                              self._base_url.geturl())
            server_response = self._control({"LS_op": OP_DESTROY})
            # There is no need to explicitly close the connection, since it is
            # handled by thread completion.
            self._join()
            self.logger.info("Closed session to <%s>", self._base_url.geturl())
        else:
            self.logger.warning("No connection to Lightstreamer")

    def subscribe(self, subscription):
        """"Perform a subscription request to Lightstreamer Server."""
        # Register the Subscription with a new subscription key
        self._current_subscription_key += 1
        self._subscriptions[self._current_subscription_key] = subscription

        # Send the control request to perform the subscription
        self.logger.debug("Making a new subscription request")
        server_response = self._control({
            "LS_Table": self._current_subscription_key,
            "LS_op": OP_ADD,
            "LS_data_adapter": subscription.adapter,
            "LS_mode": subscription.mode,
            "LS_schema": " ".join(subscription.field_names),
            "LS_id": " ".join(subscription.item_names),
        })
        if server_response.startswith(OK_CMD):
            self.logger.info("Successfully subscribed ")
        else:
            self.logger.warning("Subscription error:"+ str(server_response))
        return self._current_subscription_key

    def unsubscribe(self, subcription_key):
        """Unregister the Subscription associated to the
        specified subscription_key.
        """
        self.logger.debug("Making an unsubscription request")
        if subcription_key in self._subscriptions:
            server_response = self._control({
                "LS_Table": subcription_key,
                "LS_op": OP_DELETE
            })

            if server_response == OK_CMD:
                del self._subscriptions[subcription_key]
                self.logger.info("Successfully unsubscribed")
            else:
                self.logger.warning("Unsubscription error")
        else:
            self.logger.warning(
                "No subscription key %s found!", subcription_key)

    def _forward_update_message(self, update_message):
        """Forwards the real time update to the relative
        Subscription instance for further dispatching to its listeners.
        """
        self.logger.debug("Received update message: <%s>", update_message)
        try:
            tok = update_message.split(',', 1)
            table, item = int(tok[0]), tok[1]
            if table in self._subscriptions:
                self._subscriptions[table].notifyupdate(item)
            else:
                self.logger.warning("No subscription found!")
        except Exception:
            self.logger.exception(traceback.format_exc())

    def _receive(self):
        rebind = False
        receive = True
        while receive:
            self.logger.debug("Waiting for a new message")
            try:
                message = self._read_from_stream()
                self.logger.debug("Received message: <%s>", message)
                if not message.strip():
                    message = None
            except Exception:
                self.logger.error("Communication error")
                self.logger.exception(traceback.format_exc())
                message = None

            if message is None:
                receive = False
                self.logger.warning("No new message received")
            elif message == PROBE_CMD:
                # Skipping the PROBE message, keep on receiving messages.
                self.logger.debug("PROBE message")
            elif message.startswith(ERROR_CMD):
                # Terminate the receiving loop on ERROR message
                receive = False
                self.logger.error("ERROR")
            elif message.startswith(LOOP_CMD):
                # Terminate the the receiving loop on LOOP message.
                # A complete implementation should proceed with
                # a rebind of the session.
                self.logger.debug("LOOP")
                receive = False
                rebind = True
            elif message.startswith(SYNC_ERROR_CMD):
                # Terminate the receiving loop on SYNC ERROR message.
                # A complete implementation should create a new session
                # and re-subscribe to all the old items and relative fields.
                self.logger.error("SYNC ERROR")
                receive = False
            elif message.startswith(END_CMD):
                # Terminate the receiving loop on END message.
                # The session has been forcibly closed on the server side.
                # A complete implementation should handle the
                # "cause_code" if present.
                self.logger.info("Connection closed by the server")
                receive = False
            elif message.startswith("Preamble"):
                # Skipping Preamble message, keep on receiving messages.
                self.logger.debug("Preamble")
            else:
                self._forward_update_message(message)

        if not rebind:
            self.logger.debug("No rebind to <%s>, clearing internal session data",
                              self._base_url.geturl())
            # Clear internal data structures for session
            # and subscriptions management.
            self._stream_connection = None
            self._session.clear()
            self._subscriptions.clear()
            self._current_subscription_key = 0
        else:
            self.logger.debug("Binding to this active session")
            self.bind()


def authenticate(configs, demo):
    # IG rest API parameters
    if demo:
        rest_api_key = configs.ig_demo.access
        rest_identifier = configs.ig_demo["rest_user"]
        rest_password = configs.ig_demo["rest_pass"]
    else:
        rest_api_key = configs.ig.access
        rest_identifier = configs.ig["rest_user"]
        rest_password = configs.ig["rest_pass"]

    # IG rest login request
    ig_domain = "https://demo-api.ig.com/" if demo else "https://api.ig.com/"
    rest_url = ig_domain + "gateway/deal/session"

    headers = {}
    headers["Content-Type"] = "application/json; charset=UTF-8"
    headers["Accept"] = "application/json; charset=UTF-8"
    headers["Version"] = "2"
    headers["X-IG-API-KEY"] = rest_api_key

    request_json = {}
    request_json["identifier"] = rest_identifier
    request_json["password"] = rest_password

    rest_response = requests.request(
        "POST", rest_url, headers=headers, json=request_json)
    if rest_response.status_code != 200:
        print("error", rest_response.status_code, rest_url, rest_response.text)
        sys.exit(0)

    # collect params from IG rest login response

    xst = rest_response.headers["X-SECURITY-TOKEN"]
    cst = rest_response.headers["CST"]
    pwd = 'CST-' + cst + '|XST-' + xst
    return pwd


if __name__ == '__main__':
    demo = False
    # Establishing a new connection to Lightstreamer Server
    print("Starting connection")
    # lightstreamer_client = LSClient("http://localhost:8080", "DEMO")
    # lightstreamer_client = LSClient("http://push.lightstreamer.com", "DEMO")
    # Establishing a new connection to Lightstreamer Server

    IG_url = "https://demo-apd.marketdatasystems.com" if demo else "https://apd.marketdatasystems.com"

    configs = load_config.load_config()
    usr = configs.ig_demo["rest_user"] if demo else configs.ig["rest_user"]
    pwd = authenticate(configs, demo)

    lightstreamer_client = LSClient(base_url=IG_url,
                                    adapter_set="DEFAULT",
                                    user=usr,
                                    password=pwd)

    try:
        lightstreamer_client.connect()
    except Exception as e:
        print("Unable to connect to Lightstreamer Server")
        print(traceback.format_exc())
        sys.exit(1)
    fields = ["stock_name", "last_price", "time", "bid", "ask"]
    fields = ["UPDATE_TIME", "bid", "offer"]
    fields = ["offer"]
    fields = [field.upper() for field in fields]
    print(fields)
    # Making a new Subscription in MERGE mode
    subscription = Subscription(
        mode="MERGE",
        # items=["EURJPY", "USDJPY"],
        items=["CS.D.USDJPY.CFD.IP"],
        # items=["CS.D.USDJPY.CFD.IP", "CS.D.EURJPY.CFD.IP"],
        fields=fields,
        adapter="POSITIONS")

    # A simple function acting as a Subscription listener

    def on_item_update(item_update):
        print("{stock_name:<19}: Last{last_price:>6} - Time {time:<8} - "
              "Bid {bid:>5} - Ask {ask:>5}".format(**item_update["values"]))

    # Adding the "on_item_update" function to Subscription
    subscription.addlistener(on_item_update)

    # Registering the Subscription
    sub_key = lightstreamer_client.subscribe(subscription)

    wait_for_input()

    # Unsubscribing from Lightstreamer by using the subscription key
    lightstreamer_client.unsubscribe(sub_key)

    # Disconnecting
    lightstreamer_client.disconnect()
 
Link to comment

0 answers to this question

Recommended Posts

There have been no answers to this question yet

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×
×
  • Create New...
us