Jump to content
  • 0

Subscription through API


Guest coga

Question

I am trying to subscribe to my first streaming data. I basically copy pasted code from the IG Labs forum (what's the difference between here and there?), added my own api key, and it works. Well, almost. It connected well but I can't subscribe. What am I doing wrong?
 
Subscription returns error 17 Data Adapter not found
 
This is the subscription code:
    fields = ["offer"]
    fields = [field.upper() for field in fields]
    print(fields)
    # Making a new Subscription in MERGE mode
    subscription = Subscription(
        mode="MERGE",
        # items=["EURJPY", "USDJPY"],
        items=["CS.D.USDJPY.CFD.IP"],
        # items=["CS.D.USDJPY.CFD.IP", "CS.D.EURJPY.CFD.IP"],
        fields=fields,
        adapter="POSITIONS")
 
 
and this is the rest of the code (mostly copy pasted from the IG LAB's forum) 
# when this site used as main (for debugging), we need to add its parent
# folder to the PYTHONPATH, so we could import correctly.
if __name__ == "__main__":
    import os
    import sys
    import inspect
    currentdir = os.path.dirname(os.path.abspath(
        inspect.getfile(inspect.currentframe())))
    parentdir = os.path.dirname(currentdir)
    sys.path.insert(0, parentdir)


import utils.log_utils as log_utils
import sys
import logging as lg
import threading
import time
import traceback
import requests
import configs.load_config as load_config

from urllib.request import urlopen as _urlopen
from urllib.parse import (urlparse as parse_url, urljoin, urlencode)


def _url_encode(params):
    return urlencode(params).encode("utf-8")


def _iteritems(d):
    return iter(d.items())


def wait_for_input():
    input("{0:-^80}\n".format("HIT CR TO UNSUBSCRIBE AND DISCONNECT FROM LIGHTSTREAMER"))


CONNECTION_URL_PATH = "lightstreamer/create_session.txt"
BIND_URL_PATH = "lightstreamer/bind_session.txt"
CONTROL_URL_PATH = "lightstreamer/control.txt"
# Request parameter to create and activate a new Table.
OP_ADD = "add"
# Request parameter to delete a previously created Table.
OP_DELETE = "delete"
# Request parameter to force closure of an existing session.
OP_DESTROY = "destroy"
# List of possible server responses
PROBE_CMD = "PROBE"
END_CMD = "END"
LOOP_CMD = "LOOP"
ERROR_CMD = "ERROR"
SYNC_ERROR_CMD = "SYNC ERROR"
OK_CMD = "OK"


class Subscription(object):
    """Represents a Subscription to be submitted to a Lightstreamer Server."""

    def __init__(self, mode, items, fields, adapter=""):
        class_hierarchy = __name__ + "Subscription"
        self.logger = lg.getLogger(class_hierarchy)
        self.item_names = items
        self._items_map = {}
        self.field_names = fields
        self.adapter = adapter
        self.mode = mode
        self.snapshot = "true"
        self._listeners = []

    def _decode(self, value, last):
        """Decode the field value according to
        Lightstreamer Text Protocol specifications.
        """
        if value == "$":
            return u''
        elif value == "#":
            return None
        elif not value:
            return last
        elif value[0] in "#$":
            value = value[1:]

        return value

    def addlistener(self, listener):
        self._listeners.append(listener)

    def notifyupdate(self, item_line):
        """Invoked by LSClient each time Lightstreamer Server pushes
        a new item event.
        """
        # Tokenize the item line as sent by Lightstreamer
        toks = item_line.rstrip('\r\n').split('|')
        undecoded_item = dict(list(zip(self.field_names, toks[1:])))

        # Retrieve the previous item stored into the map, if present.
        # Otherwise create a new empty dict.
        item_pos = int(toks[0])
        curr_item = self._items_map.get(item_pos, {})
        # Update the map with new values, merging with the
        # previous ones if any.
        self._items_map[item_pos] = dict([
            (k, self._decode(v, curr_item.get(k))) for k, v
            in list(undecoded_item.items())
        ])
        # Make an item info as a new event to be passed to listeners
        item_info = {
            'pos': item_pos,
            'name': self.item_names[item_pos - 1],
            'values': self._items_map[item_pos]
        }

        # Update each registered listener with new event
        for on_item_update in self._listeners:
            on_item_update(item_info)


class LSClient(object):
    """Manages the communication with Lightstreamer Server"""

    def __init__(self, base_url, adapter_set="", user="", password=""):
        class_hierarchy = __name__ + "LSClient"
        self.logger = lg.getLogger(class_hierarchy)
        self._base_url = parse_url(base_url)
        self._adapter_set = adapter_set
        self._user = user
        self._password = password
        self._session = {}
        self._subscriptions = {}
        self._current_subscription_key = 0
        self._stream_connection = None
        self._stream_connection_thread = None
        self._bind_counter = 0

    def _encode_params(self, params):
        """Encode the parameter for HTTP POST submissions, but
        only for non empty values..."""
        return _url_encode(
            dict([(k, v) for (k, v) in _iteritems(params) if v])
        )

    def _call(self, base_url, url, params):
        """Open a network connection and performs HTTP Post
        with provided params.
        """
        # Combines the "base_url" with the
        # required "url" to be used for the specific request.
        url = urljoin(base_url.geturl(), url)
        body = self._encode_params(params)
        self.logger.debug("Making a request to <%s> with body <%s>", url, body)
        return _urlopen(url, data=body)

    def _set_control_link_url(self, custom_address=None):
        """Set the address to use for the Control Connection
        in such cases where Lightstreamer is behind a Load Balancer.
        """
        if custom_address is None:
            self._control_url = self._base_url
        else:
            parsed_custom_address = parse_url("//" + custom_address)
            self._control_url = parsed_custom_address._replace(
                scheme=self._base_url[0]
            )

    def _control(self, params):
        """Create a Control Connection to send control commands
        that manage the content of Stream Connection.
        """
        max_reads=100
        params["LS_session"] = self._session["SessionId"]
        response = self._call(self._control_url, CONTROL_URL_PATH, params)
        decoded_response = ""
        for i in range(max_reads):
            response_line = response.readline().decode("utf-8").rstrip()
            if not response_line:
                break
            else:
                decoded_response += response_line + "\n"
        self.logger.debug("Server response: <%s>", decoded_response)
        return decoded_response

    def _read_from_stream(self):
        """Read a single line of content of the Stream Connection."""
        line = self._stream_connection.readline().decode("utf-8").rstrip()
        return line

    def connect(self):
        """Establish a connection to Lightstreamer Server to create a new
        session.
        """
        self.logger.debug("Opening a new session to <%s>",
                          self._base_url.geturl())
        self._stream_connection = self._call(
            self._base_url,
            CONNECTION_URL_PATH,
            {
                "LS_op2": 'create',
                "LS_cid": 'mgQkwtwdysogQz2BJ4Ji kOj2Bg',
                "LS_adapter_set": self._adapter_set,
                "LS_user": self._user,
                "LS_password": self._password}
        )
        stream_line = self._read_from_stream()
        self._handle_stream(stream_line)

    def bind(self):
        """Replace a completely consumed connection in listening for an active
        Session.
        """
        self.logger.debug("Binding to <%s>", self._control_url.geturl())
        self._stream_connection = self._call(
            self._control_url,
            BIND_URL_PATH,
            {
                "LS_session": self._session["SessionId"]
            }
        )

        self._bind_counter += 1
        stream_line = self._read_from_stream()
        self._handle_stream(stream_line)
        self.logger.info("Bound to <%s>", self._control_url.geturl())

    def _handle_stream(self, stream_line):
        if stream_line == OK_CMD:
            self.logger.info("Successfully connected to <%s>",
                             self._base_url.geturl())
            self.logger.debug("Starting to handling real-time stream")
            # Parsing session inkion
            while 1:
                next_stream_line = self._read_from_stream()
                if next_stream_line:
                    session_key, session_value = next_stream_line.split(":", 1)
                    self._session[session_key] = session_value
                else:
                    break

            # Setup of the control link url
            self._set_control_link_url(self._session.get("ControlAddress"))

            # Start a new thread to handle real time updates sent
            # by Lightstreamer Server on the stream connection.
            self._stream_connection_thread = threading.Thread(
                name="StreamThread-{0}".format(self._bind_counter),
                target=self._receive
            )
            self._stream_connection_thread.setDaemon(True)
            self._stream_connection_thread.start()
            self.logger.info("Started handling of real-time stream")
        else:
            lines = self._stream_connection.readlines()
            lines.insert(0, stream_line)
            self.logger.error("Server response error: \n%s",
                              " ".join((str(x) for x in lines)))
            raise IOError()

    def _join(self):
        """Await the natural StreamThread termination."""
        if self._stream_connection_thread:
            self.logger.debug("Waiting for thread to terminate")
            self._stream_connection_thread.join()
            self._stream_connection_thread = None
            self.logger.debug("Thread terminated")

    def disconnect(self):
        """Request to close the session previously opened with the connect()
        invocation.
        """
        if self._stream_connection is not None:
            self.logger.debug("Closing session to <%s>",
                              self._base_url.geturl())
            server_response = self._control({"LS_op": OP_DESTROY})
            # There is no need to explicitly close the connection, since it is
            # handled by thread completion.
            self._join()
            self.logger.info("Closed session to <%s>", self._base_url.geturl())
        else:
            self.logger.warning("No connection to Lightstreamer")

    def subscribe(self, subscription):
        """"Perform a subscription request to Lightstreamer Server."""
        # Register the Subscription with a new subscription key
        self._current_subscription_key += 1
        self._subscriptions[self._current_subscription_key] = subscription

        # Send the control request to perform the subscription
        self.logger.debug("Making a new subscription request")
        server_response = self._control({
            "LS_Table": self._current_subscription_key,
            "LS_op": OP_ADD,
            "LS_data_adapter": subscription.adapter,
            "LS_mode": subscription.mode,
            "LS_schema": " ".join(subscription.field_names),
            "LS_id": " ".join(subscription.item_names),
        })
        if server_response.startswith(OK_CMD):
            self.logger.info("Successfully subscribed ")
        else:
            self.logger.warning("Subscription error:"+ str(server_response))
        return self._current_subscription_key

    def unsubscribe(self, subcription_key):
        """Unregister the Subscription associated to the
        specified subscription_key.
        """
        self.logger.debug("Making an unsubscription request")
        if subcription_key in self._subscriptions:
            server_response = self._control({
                "LS_Table": subcription_key,
                "LS_op": OP_DELETE
            })

            if server_response == OK_CMD:
                del self._subscriptions[subcription_key]
                self.logger.info("Successfully unsubscribed")
            else:
                self.logger.warning("Unsubscription error")
        else:
            self.logger.warning(
                "No subscription key %s found!", subcription_key)

    def _forward_update_message(self, update_message):
        """Forwards the real time update to the relative
        Subscription instance for further dispatching to its listeners.
        """
        self.logger.debug("Received update message: <%s>", update_message)
        try:
            tok = update_message.split(',', 1)
            table, item = int(tok[0]), tok[1]
            if table in self._subscriptions:
                self._subscriptions[table].notifyupdate(item)
            else:
                self.logger.warning("No subscription found!")
        except Exception:
            self.logger.exception(traceback.format_exc())

    def _receive(self):
        rebind = False
        receive = True
        while receive:
            self.logger.debug("Waiting for a new message")
            try:
                message = self._read_from_stream()
                self.logger.debug("Received message: <%s>", message)
                if not message.strip():
                    message = None
            except Exception:
                self.logger.error("Communication error")
                self.logger.exception(traceback.format_exc())
                message = None

            if message is None:
                receive = False
                self.logger.warning("No new message received")
            elif message == PROBE_CMD:
                # Skipping the PROBE message, keep on receiving messages.
                self.logger.debug("PROBE message")
            elif message.startswith(ERROR_CMD):
                # Terminate the receiving loop on ERROR message
                receive = False
                self.logger.error("ERROR")
            elif message.startswith(LOOP_CMD):
                # Terminate the the receiving loop on LOOP message.
                # A complete implementation should proceed with
                # a rebind of the session.
                self.logger.debug("LOOP")
                receive = False
                rebind = True
            elif message.startswith(SYNC_ERROR_CMD):
                # Terminate the receiving loop on SYNC ERROR message.
                # A complete implementation should create a new session
                # and re-subscribe to all the old items and relative fields.
                self.logger.error("SYNC ERROR")
                receive = False
            elif message.startswith(END_CMD):
                # Terminate the receiving loop on END message.
                # The session has been forcibly closed on the server side.
                # A complete implementation should handle the
                # "cause_code" if present.
                self.logger.info("Connection closed by the server")
                receive = False
            elif message.startswith("Preamble"):
                # Skipping Preamble message, keep on receiving messages.
                self.logger.debug("Preamble")
            else:
                self._forward_update_message(message)

        if not rebind:
            self.logger.debug("No rebind to <%s>, clearing internal session data",
                              self._base_url.geturl())
            # Clear internal data structures for session
            # and subscriptions management.
            self._stream_connection = None
            self._session.clear()
            self._subscriptions.clear()
            self._current_subscription_key = 0
        else:
            self.logger.debug("Binding to this active session")
            self.bind()


def authenticate(configs, demo):
    # IG rest API parameters
    if demo:
        rest_api_key = configs.ig_demo.access
        rest_identifier = configs.ig_demo["rest_user"]
        rest_password = configs.ig_demo["rest_pass"]
    else:
        rest_api_key = configs.ig.access
        rest_identifier = configs.ig["rest_user"]
        rest_password = configs.ig["rest_pass"]

    # IG rest login request
    ig_domain = "https://demo-api.ig.com/" if demo else "https://api.ig.com/"
    rest_url = ig_domain + "gateway/deal/session"

    headers = {}
    headers["Content-Type"] = "application/json; charset=UTF-8"
    headers["Accept"] = "application/json; charset=UTF-8"
    headers["Version"] = "2"
    headers["X-IG-API-KEY"] = rest_api_key

    request_json = {}
    request_json["identifier"] = rest_identifier
    request_json["password"] = rest_password

    rest_response = requests.request(
        "POST", rest_url, headers=headers, json=request_json)
    if rest_response.status_code != 200:
        print("error", rest_response.status_code, rest_url, rest_response.text)
        sys.exit(0)

    # collect params from IG rest login response

    xst = rest_response.headers["X-SECURITY-TOKEN"]
    cst = rest_response.headers["CST"]
    pwd = 'CST-' + cst + '|XST-' + xst
    return pwd


if __name__ == '__main__':
    demo = False
    # Establishing a new connection to Lightstreamer Server
    print("Starting connection")
    # lightstreamer_client = LSClient("http://localhost:8080", "DEMO")
    # lightstreamer_client = LSClient("http://push.lightstreamer.com", "DEMO")
    # Establishing a new connection to Lightstreamer Server

    IG_url = "https://demo-apd.marketdatasystems.com" if demo else "https://apd.marketdatasystems.com"

    configs = load_config.load_config()
    usr = configs.ig_demo["rest_user"] if demo else configs.ig["rest_user"]
    pwd = authenticate(configs, demo)

    lightstreamer_client = LSClient(base_url=IG_url,
                                    adapter_set="DEFAULT",
                                    user=usr,
                                    password=pwd)

    try:
        lightstreamer_client.connect()
    except Exception as e:
        print("Unable to connect to Lightstreamer Server")
        print(traceback.format_exc())
        sys.exit(1)
    fields = ["stock_name", "last_price", "time", "bid", "ask"]
    fields = ["UPDATE_TIME", "bid", "offer"]
    fields = ["offer"]
    fields = [field.upper() for field in fields]
    print(fields)
    # Making a new Subscription in MERGE mode
    subscription = Subscription(
        mode="MERGE",
        # items=["EURJPY", "USDJPY"],
        items=["CS.D.USDJPY.CFD.IP"],
        # items=["CS.D.USDJPY.CFD.IP", "CS.D.EURJPY.CFD.IP"],
        fields=fields,
        adapter="POSITIONS")

    # A simple function acting as a Subscription listener

    def on_item_update(item_update):
        print("{stock_name:<19}: Last{last_price:>6} - Time {time:<8} - "
              "Bid {bid:>5} - Ask {ask:>5}".format(**item_update["values"]))

    # Adding the "on_item_update" function to Subscription
    subscription.addlistener(on_item_update)

    # Registering the Subscription
    sub_key = lightstreamer_client.subscribe(subscription)

    wait_for_input()

    # Unsubscribing from Lightstreamer by using the subscription key
    lightstreamer_client.unsubscribe(sub_key)

    # Disconnecting
    lightstreamer_client.disconnect()
 
Link to comment

0 answers to this question

Recommended Posts

There have been no answers to this question yet

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
  • General Statistics

    • Total Topics
      21,259
    • Total Posts
      90,868
    • Total Members
      41,369
    • Most Online
      7,522
      10/06/21 10:53

    Newest Member
    mano_060
    Joined 05/02/23 19:34
  • Posts

    • Q: How can I experience the Fairy Cat metaverse? A: Debuting in January 2023, Fairy Cat is one of the most popular crypto metaverse games. While earlier games (World of Warcraft, League of Legends, Destiny 2) either precede blockchain technology or have not integrated it into their games at all, more recent metaverse iterations - such as FairyCat - integrate metaverse games and blockchain platforms. These so-called "crypto metaverse" games use blockchain technology to achieve encrypted in-game cash, play-to-earn (P2E) and non-fungible tokens (NFT), decentralization and other functionality to bring extraordinary benefits to metaverse games. relevant impact. You don't need to be a crypto expert to play this innovative game, although a little crypto knowledge is helpful to take advantage of all the functionality of FairyCat. Start exploring Fairy Cat: 1. Go to the Fairy Cat website to learn more about the game's currency models, then download the Metamask wallet. 2. Start earning money by opening the game in the Metamask wallet browser. At this point, the game will connect to your wallet account. Don't worry that this game will steal your wallet balance, because it doesn't need your account authorization data, it just needs your wallet data. 3. Once you are in Fairy Cat, you can go through a money transfer tutorial to learn the basic controls. Then you can earn money in the following three methods: Method 1: Adopt a cat to help you find treasures, and you can make profit after selling the treasures. Method 2: Synthesize an LV1 elf cat through 5 elves, and then sell the LV1 elf cat to earn money. This can be understood simply as using 5U to earn 10U (50% probability), and you can buy items to increase the synthesis rate. Method 3: Invite your friends to play the game. After your friends earn money, you can also get commission income. The income is very high.
    • HI,  Please if any IG supporting staff can answer the questions below? 1. in the upcoming AMC/APE vote on 14 March 2023, can shareholder that hold AMC and APE at IG able to make a vote? I also have AMC and APE at IBKR and they did send out a notification to inform me that I can do proxy vote. Will IG do the same? 2. the upcoming vote is to determine APE covert back to AMC then do reverse spilt. Last year after APE issued, IG staff told me that they have to move APE to share dealing account from my ISA account as ISA is not eligible to hold APE (which is completely unbelivable, my APE at AJ Bell and HL are still sitting in ISA account). Anyway, if the conversion and reverse spilt is going to happen, will IG move my APE back to ISA account for the corporate action? Many Thanks
    • The epidemic in recent years has produced profound changes in people's lifestyles, and many of their activities have started to be developed in the virtual world, which plays an important role in promoting the development of the universe of meta. Games, as an important part of the universe of meta, will be the first industry to gain the benefits , but the following conditions are still indispensable for blockchain games to achieve better development.   - Wallets and Exchanges. Through the use of wallets, game users can truly take control of the assets in the game, and exchanges provide a trading platform and liquidity for these game assets NFT, helping users trade and revitalize their game assets. Wallets are a key application for depositing blockchain assets. Currently, wallets such as Metamask are not enough to support the extensive adoption of block games, and wallets will further advance community drive the popularity of blockchain games.  - Game playability. The biggest difference between blockchain games and traditional games is that gamers are also investors. Many blockchain games have gone out of course, treating all users as investors only and forgetting that the essence of making games is to develop high-quality games that users are interested in paying for. If most people play a game just to make money, and no one is ready to pay for it, it becomes a side-scam. The revenue of the participants of the side scam comes from the money of the people who come later, and if there are not enough people who come later, they will not be able to cash in the revenue, leading to the breakdown.  - Speed and performance. A high level of concurrency is the characteristic of game applications, so Wax, Coinan Chain and Polygon, which are public chains with a high level of concurrency and low fees, have attracted a large number of game developers. In the future, with the maturity of technologies such as Ethernet Layer 2 and slice scaling, as well as the maturity of other gaming public chain technologies compatible with EVM, blockchain games will have a better development and operation environment.  - Cross-chain and interaction. Cross-chain and interaction of game assets will be a direction of future development of metaverse games, and blockchain should be connected and accessible like the Internet. The maturity of cross-chain technology in the future will promote the cross-chain transfer of game assets and NFT, and improve the interaction experience of the system                                                                                     The game should maintain the operation of the project by relying on the payment of some users or selling profitable products, just like the new blockchain game Fairy Cat in 2023, which developed diverse gameplay for players and increased the playability of the game, thus got the recognition of many players. Blockchain games need to work hard on game experience and playability. Token economics is essentially just a kind of interest structure , and even good token economics cannot replace the development, operation and profitability of the project itself. It is believed that with the maturity of the above conditions, more and more game developers and investors will enter this industry and together promote the production of some blockchain games with beautiful graphics, first-class experience and high playability.
×
×
  • Create New...