Skip to content

Latest commit

 

History

History
3101 lines (2419 loc) · 102 KB

hft.org

File metadata and controls

3101 lines (2419 loc) · 102 KB

hft project

Main org file for the hft project

scarecapital.com/hft

README.md

<img src="http://scarcecapital.com/assets/hft-blue.png" width="100px">

[Blog](http://scarcecapital.com/hft)

hft is a small project with a big ambition. We aim to build the worlds best
algorithmic trading platform using the best off-the-shelf open source
technology stack to be found.

hft is created and maintained by [Tony Day](http://scarcecapital.com).

## Quick start

There is no quick start for hft.  The easiest way to get up to speed is to read the project [blog](http://scarcecapital.com/hft).  If you're interested in contributing to development or find a logic bug, then fork me with:

```
$ git clone https://github.com/tonyday567/hft.git
```

# technology stack #

The world of high frequency trading is a broad church of opinion, technology, ideas and motivations.  hft is being developed using many different tools:

### [emacs](http://www.gnu.org/software/emacs/), [org-mode](http://orgmode.org) and literate programming###

[hft.org](https://github.com/tonyday567/hft/blob/master/hft.org) is the nerve
center of active development and contains just about all the important code, research notes
and design tools being used.

The project makes heavy use of [babel](http://orgmode.org/worg/org-contrib/babel/) to pick and mix between coding environments and languages, whilst still remaining [literate](http://www.haskell.org/haskellwiki/Literate_programming):

>The main idea is to regard a program as a communication to human beings rather than as a set of instructions to a computer. ~ Knuth

Similarly, a project such as hft is as much about communication between human beings as it is about maintenance of source code.

### [R](http://www.r-project.org) ###

R is a strongly functional but imperative language being used for rapid
development and research of hft and algo ideas as they arise. Most everything
that you can think of (databases, broker interfaces, statistical analysis,
visualization) has an R package ready to get you up and going in 5 minutes.

### [haskell](http://www.haskell.org/haskellwiki/Haskell) ###

R can be many things but what it is least set up for is development of
asyncronous code. To fill this gap, the project is using haskell to frame the
system as and when it develops.

### [Interactive Brokers](http://www.interactivebrokers.com/en/main.php) ###

Eventually, hft will be broker independent but during the development phase IB
is the test case. Interactive has the most mature API that works out of the
box and a demo account so that hft can come pre-plumbed so that (eventually)
the project can also run out of the box.

Interactive Brokers consolidates tick data into 0.3 second time slices so it
isn't appropriate for low-latency work.

### [iqfeed](http://www.iqfeed.net)  ###

Just because it's open-source doesn't mean that it's cost free. iqfeed has
been chosen as an initial data feed to base project R&D efforts on. iqfeed
costs dollars but the software can be downloaded for free and a demo version
allows live data to flow with a lag.

A useful way to support the hft project is to let DTN know if you decide to
purshase iqfeed due to the project.

## bug tracker

Have a bug or a feature request? [Please open a new issue](https://github.com/tonyday567/hft/issues). 

## Community

hft is sponsored by [Scarce Capital](http://scarcecapital.com) as an adjunct to client advisory services.

Follow [@scarcecapital on Twitter](http://twitter.com/scarcecapital).

Read, subscribe (and contribute!) to the [The Official hft Blog](http://scarcecapital.com/hft).

The project is partially due to active discussions on the [Open Source HFT Linkedin group](http://www.linkedin.com/groups?home=&gid=4405119&trk=anet_ug_hm)

## Contributing

Please submit all pull requests against the master branch.

Thanks!

## Authors

**Tony Day**

+ [http://twitter.com/tonyday567](http://twitter.com/tonyday567)
+ [http://github.com/tonyday567](http://github.com/tonyday567)


## Copyright and license

Copyright 2013 Scarce Capital.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this work except in compliance with the License.
You may obtain a copy of the License in the LICENSE file, or at:

  [http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

0.2 schedule

  • connect IB
  • start writing controller
    • feed control
  • integrate disruptor

hft project design

The solution space for a highish-frequency, automated, algorithmic trading system is wide. Most solutions out there are propietary, expensive and expansive.

existing design choice

The typical system platform choices for the aspiring fast trader can be boiled down to a few options:

  • pure HFT

    Proprietary real-time event processing that almost has to be in a language with no garbage collection (C, C++ or Java) to avoid millisec delays (ie low latency processes).

    • dedicated co-located servers
    • real-time order book analytics
    • real-time price information streams
    • algorithms ‘hard-wired’
  • Big Data

    A statistical R&D undertaking using matlab, R, julia and fast database technology.

  • Old school

    Modification of a human-centric trading system focused on broker interfaces and visualization technologies (charting).

Whatever the orientation there are significant weaknesses to existing solutions:

  • most open source projects come with strings attached and are largely inducements to purchase this or that commercial service or product.
  • build-your-own solutions need to be largely built from scratch.

project design choices

There are several design features that drive the hft project and we hope offer a comparative advantage:

  • HFT on a budget: Development is focused on using the cheapest technologies where no free option exists.
  • literate programming documentation: markets are complex and so is large-scale project code. The heart of this project is experimentation with how to go about large-scale project formulation using a literate programming ethic.
  • semi-HFT: much HFT trading is about pure speed - being the first to react to an obvious mis-pricing, front-running a lazy or half-hidden market order are two obvious examples. We believe there is a gap in the market between pure-speed HFT and human-intervention algo trading that the project is seeking to exploit.
  • meta-algo: the entire system and process is an algorithm to be searched, optimised and refactored.
  • open source: the hft project is open source purely and simply, licenced under the generous Apache system.
  • modern toolkit: the project is oriented towards using higher-level languages and concepts for rapid, robust development. Together with a literate programming style, this translates to using the right tool for the right job and making less compromises with the goals of the project.

Haskell as the solution

The candidate solution to the design criteria is to use haskell for the system coding:

system specification

hft is in an experimental phase and, as such, there is a need for flexibility in the top-down design of the system. To achieve this, the overall design is first being modelled using graphviz. The current candidate system is:

candidate svg dot

digraph G {
        node [label="\N"];
        node [style=filled, color="#1f3950",fontcolor="#eeeeee",shape=box]; 
        subgraph cluster_market_data {
                graph [label="market data", color="#909090"];
                exchange [shape=egg,color="#ff111111",fontcolor="#101010",label="exchanges"];
                aggregator [shape=egg,color="#cc11cc22",fontcolor="#101010",label="data stream"];
                localport [label="local node"];
                exchange -> aggregator [dir=none];
                aggregator -> localport [dir=both];
        }
        subgraph cluster_offwire {
                graph [label="offwire",
                        color="#909090"];
                offwirealgo [label="offline algo"];
                observer;
                databases;
                observer -> databases [color=red,label="write",fontcolor=red];
        }
        subgraph cluster_onwire {
                graph [label="onwire",
                        color="#909090"];
                node [style=filled];
                disruptor [label="event server"];
                eventalgo [label="algo"];
                controller;
                controller -> eventalgo [color="#aaaaaa",dir=both]
                disruptor -> listener;
                disruptor -> eventalgo;
                disruptor -> controller;
                controller -> disruptor [color="#0080ff"];
        }
        subgraph cluster_broker {
                graph [label="broker data",
                        color="#909090"];
                broker [shape=egg,color="#ff111111",fontcolor="#101010",label="brokers"];
                brokeraggregator [shape=egg,color="#cc11cc22",fontcolor="#101010",label="aggregation"];
                broker -> brokeraggregator [dir=none];
                brokeraggregator -> trader [dir=both];
        }
        localport -> observer [color="#aaaaaa",style=dotted];
        controller -> localport [color="#aaaaaa"];
        localport -> disruptor [color="#0080ff"];
        listener -> observer [color="#aaaaaa",style=dotted];
        controller -> observer [color="#aaaaaa",style=dotted];
        controller -> trader [color="#aaaaaa",dir=both];
        controller -> offwirealgo [color="#aaaaaa",dir=both];
        databases -> offwirealgo [color=red,label="read",fontcolor=red];
        trader -> observer [color="#aaaaaa",style=dotted];
        eventalgo -> observer [color="#aaaaaa",style=dotted];
        offwirealgo -> observer [color="#aaaaaa",style=dotted];
}

img/candidate.svg

  • blue boxes represent individual components of the system
  • other colors represent external systems and data sources
  • each edge of the chart represents a messaging sytem requirement
  • there are two main one-way message passing routines that probably need to be very very fast (blue lines)
  • there is one read from database and one write to database (red lines)
  • every component registers to an observer component that records system state and dynamics (grey dotted).

The components have been grouped into several clusters:

  • market data: representing trade data, order book and news information flowing from outside the sytem to a local data node.
  • broker data: representing communication with trading mechanisms
  • onwire: components that are “in the event stream”. This is motivated by the specifications and documentation of the disruptor which argues that a single thread “wheel” is the best way to enable fast processing of market data into trading orders.
  • offwire: this represents algorithms and processing that are not on the single-thread process. The motivation here is to test the hypothesis in the disruptor argument.

There are several ideas that are being tested:

  • that the entire system should be the subject of search and optimisation, rather than componentry. One example of this is separation of complex event definitions from the statistical analysis once events are defined.
  • there is a focus on automation and machine learning. As such there is no place for human interaction. In particular, no visualization is required.
  • messaging between components can be the same general process. The components can also be tested in exactly the same way (such as speed and robustness testing)

dot files

Some alternative graphs for testing purposes:

sandpit dot

digraph G {
        node [label="\N"];
        node [style=filled, color="#1f3950",fontcolor="#eeeeee",shape=box]; 
        subgraph cluster_market_data {
                graph [label="market data", color="#909090"];
                {rank=min; dataaggregator [shape=egg,color="#cc11cc22",fontcolor="#101010",label="market(s)"];}
                localport [label="local market data stream"];
                dataaggregator -> localport [dir=both];
        }
        subgraph cluster_offwire {
                graph [label="offwire",
                        color="#909090"];
                offwirealgo [label="offline algo"];
                observer;
                databases;
                observer -> databases [color=red,label="write",fontcolor=red];
        }
        subgraph cluster_onwire {
                graph [label="onwire",
                        color="#909090"];
                node [style=filled];
                disruptor [label="event server"];
                eventalgo [label="algo"];
                controller;
                controller -> eventalgo [color="#aaaaaa",dir=both]
                disruptor -> listener;
                disruptor -> eventalgo;
                disruptor -> controller;
                controller -> disruptor [color="#0080ff"];
        }
        subgraph cluster_broker {
                graph [label="broker",
                        color="#909090"];
                brokeraggregator [shape=egg,color="#cc11cc22",fontcolor="#101010",label="broker(s)"];
                brokeraggregator -> trader [dir=both];
        }
        localport -> observer [color="#aaaaaa",style=dotted];
        controller -> localport [color="#aaaaaa"];
        localport -> disruptor [color="#0080ff"];
        listener -> observer [color="#aaaaaa",style=dotted];
        controller -> observer [color="#aaaaaa",style=dotted];
        controller -> trader [color="#aaaaaa",dir=both];
        controller -> offwirealgo [color="#aaaaaa",dir=both];
        databases -> offwirealgo [color=red,label="read",fontcolor=red];
        trader -> observer [color="#aaaaaa",style=dotted];
        eventalgo -> observer [color="#aaaaaa",style=dotted];
        offwirealgo -> observer [color="#aaaaaa",style=dotted];
}

dot/sandpit2.png

test.unit1.dot

digraph G {
        node [label="\N"];
        subgraph cluster_market_data {
                graph [label="market data"];
                node [style=filled,
                        color=white];
                edge [dir=both];
                exchange -> aggregator;
                aggregator -> localport [style=filled, fillcolor=lightgrey, shape=box];
        }
        subgraph cluster1 {
                graph [label=controller,
                        color=blue];
                node [style=filled];
                observer -> controller;
        }
        subgraph cluster3 {
                graph [label="multi thread",
                        color=red];
                node [style=filled];
                database -> multithreadalgo;
        }
        subgraph cluster2 {
                graph [label="event stream",
                        color=blue];
                node [style=filled];
                disruptor -> listener;
                disruptor -> eventalgo;
        }
        subgraph cluster4 {
                brokers -> trader;
        }
        localport -> observer;
        controller -> localport;
        localport -> disruptor;
        disruptor -> controller;
        disruptor -> observer;
        controller -> disruptor;
        listener -> database;
        eventalgo -> multithreadalgo;
        controller -> trader;
        trader -> observer;
        eventalgo -> controller;
        multithreadalgo -> controller;
        observer -> database;
}

controller

Priority tasks:

  • [ ] relate to dot code in candidate dot

    The idea is to start with a dot graph and use this to register each component and the messaging between components.

    • [ ] register nodes from candidate dot
    • [ ] swap dotText in controller.hs from file to hardcoded string (thus removing IO issues)
    • [ ] register edges (which will use STM or common messaging systems)

The controller module is both a component of the overall system and is the complete system.

To (eventually) compile and run the hft project, compile and run the following code:

controller.hs

-- Example
--
-- $ ghc --make controller.hs
-- $ ./controller
import ControllerTest
import System.Environment
import Data.Maybe

main :: IO ()
main = do
     a <- getArgs
     let f = fromMaybe "../dot/candidate.dot" $ listToMaybe a 
     dotGraph <- importDotFile f 
     putStrLn "nodes:"
     putStrLn $ show $ nodeList dotGraph
     putStrLn "connections:"
     putStrLn $ show $ edgeList dotGraph
     return ()

controllertest.hs

module ControllerTest 
( importDotFile
, importDot
, printGraph
, nodeList
, edgeList
) where

import Data.GraphViz
import qualified Data.Text.Lazy as L
import qualified Data.Text.Lazy.IO as I
import qualified Data.GraphViz.Types.Generalised as G
import Data.Graph.Inductive.Graph

importDotFile :: FilePath -> IO (G.DotGraph String)
importDotFile f = do
        dotText <- I.readFile f 
        return $ parseDotGraph dotText

importDot :: L.Text -> G.DotGraph Node
importDot s = parseDotGraph s

printGraph :: G.DotGraph String -> IO ()
printGraph d = do
        putStrLn $ L.unpack $ printDotGraph d
        return()

nodeList :: G.DotGraph String -> [String]
nodeList g = map nodeID $ graphNodes g

edgeList :: G.DotGraph String -> [(String,String)]
edgeList g =  map (\x -> (fromNode x, toNode x)) $ graphEdges g

edges

import ControllerTest
g <- importDotFile "../dot/test.unit2.dot"
edgeList g

nodes

import ControllerTest
import Data.List
g <- importDotFile "../dot/test.unit2.dot"
map (\x -> [x]) $ nodeList g

commandline

cd ~/projects/hft/haskell
ghc --make controller.hs
./controller

market data feed

choices

There is no such thing as live market data for free (please let us know if this is wrong!).

The closest to free data is the Interactive Brokers feed. IB consolidate market data and post every 0.3 seconds however, making it unsuitable for testing lower-latency ideas.

Initial testing of market data is concentrating on iqfeed.

  • iqfeed is the cheapest “unencumbered” market data feed option
  • it can be downloaded for free and a demo account used for testing (data is delayed)

Now the bad news:

  • iqfeed exists only as windows software
  • the process is hardwired to communicate via a tcp connection.
  • version 4.9 does not include millisec information. 5.0 does though and is coming to the free client (eventually).
  • the feed has a habit of going down several times a day so that there will be gaps in the event stream.
  • you will need a login id and password to use the software which you get in a free trial

iqfeed

other choices

Tick Data Downloader Kinetick - Streaming real time quotes and historical market data - features

Port comms

There are 4 main communication points to iqfeed:

Level1Port 5009 Streaming Level 1 Data and News Level2Port 9200 Streaming Market Depth and NASDAQ Level 2 Data LookupPort 9100 Historical Data, Symbol Lookup, News Lookup, and Chains Lookup information AdminPort 9300 Connection data and management.

More information can be obtained at DTN IQFeed Developer Area or https://www.iqfeed.net/dev/main.cfm (for a price).

Setup info

iqfeed is available for download via http://www.iqfeed.net/index.cfm?displayaction=support&section=download

Personally, my development environment is on a mac so I need to start and manage the process via wine.

From the command line:

For the demo product (delayed feed):

wine "Z:\\Users\\tonyday\\wine\\iqfeed\\iqconnect.exe" -product IQFEED_DEMO -version 1
nc localhost 5009

For a live account:

wine "Z:\\Users\\tonyday\\wine\\iqfeed\\iqconnect.exe" ‑product yourproductid ‑version 0.1 ‑login yourlogin ‑password yourpassword -autoconnect -savelogininfo

R interfacing

Using R to read the raw feed proceeds along the following lines:

msg3<-"function=subscribe|item=MI.EQCON.1|schema=last_price;ask;bid" msg4<-"function=unsubscribe" 
#open socket connection 

socketPointer<-socketConnection('localhost', port=5333, server=FALSE) 
#subscribe 

writeLines(msg3, socketPointer) 
#read data from file 
readLines(con=socketPointer,n=1,ok=TRUE,warn=TRUE,encoding='UTF-8') 
#unsubscribe 

writeLines(msg4, socketPointer) 
#close socket 

close(socketPointer)
rm(list = ls())
code.startup = system2("wine", "\"Z:\\\\Users\\\\tonyday\\\\wine\\\\iqfeed\\\\iqconnect.exe\"", stdout="", stderr="",wait=FALSE)
Sys.sleep(10)
socketAdmin=socketConnection('localhost', port=9300, open="a+") 
Sys.sleep(1)
if (isOpen(socketAdmin)) {
  response.initial.stream = readLines(socketAdmin)
  print(response.initial.stream)
} else {
  print("login failed")
}

R sucks at asynchronous programming.

haskell interfacing

feed

no automation or control yet

  • all incoming data gets written to a file specified in args
  • input via stdin

To compile and run:

cd haskell
ghc --make feed.hs threaded
./feed data.out
import Control.Concurrent
import Network
import System.Environment
import System.Process
import System.IO
import Control.Exception
import System.Exit
import Control.Monad (forever)
import Data.Time.Clock
import Data.Time.Format
import Data.Time.Calendar
import System.Locale


con :: String -> String -> IO ()
con host port = do
    h <- connectTo host $ PortNumber $ toEnum $ read port
    hSetBuffering stdout LineBuffering
    hSetBuffering h      LineBuffering
    done <- newEmptyMVar

    _ <- forkIO $ (hGetContents h >>= putStr)
                `finally` tryPutMVar done ()

    _ <- forkIO $ (getContents >>= hPutStr h)
                `finally` tryPutMVar done ()

                -- Wait for at least one of the above threads to complete
    takeMVar done

conFileTime :: String -> String -> String -> IO ()
conFileTime host port file = do
    h <- connectTo host $ PortNumber $ toEnum $ read port
    f <- openFile file WriteMode
    hSetBuffering stdout LineBuffering
    hSetBuffering h      LineBuffering
    hSetBuffering f      LineBuffering
    done <- newEmptyMVar

    _ <- forkIO $ forever (do
                        t <- getCurrentTimeString
                        st <- hGetLine h
                        hPutStrLn f $ t ++ "," ++ st)
                `finally` tryPutMVar done ()

    _ <- forkIO $ (getContents >>= hPutStr h)
                `finally` tryPutMVar done ()

                -- Wait for at least one of the above threads to complete
    takeMVar done

conAdmin :: String -> IO ()
conAdmin cmds = do
  con "localhost" "9300"
  putStr cmds

conStream :: String -> IO ()
conStream cmds = do
  con "localhost" "5009"
  putStr cmds

conLookup :: String -> IO ()
conLookup cmds = do
  con "localhost" "9100"
  putStr cmds

logon :: IO ()
logon = do
  let cmd = "wine"
      args = ["Z:\\Users\\tonyday\\wine\\iqfeed\\iqconnect.exe", "-product IQFEED_DEMO -version 1"]
  _ <- rawSystem cmd args
  return()


getCurrentTimeString :: IO String
getCurrentTimeString = do
   now <- getCurrentTime
   let offset = diffUTCTime  (UTCTime (ModifiedJulianDay 0) (secondsToDiffTime 0)) (UTCTime (ModifiedJulianDay 0) (secondsToDiffTime (4 * 60 * 60)))
   return (formatTime defaultTimeLocale "%H:%M:%S%Q" $ addUTCTime offset now)


main :: IO ExitCode
main = do
  [file] <- getArgs
  _ <- forkIO (logon)
  threadDelay $ 1000000 * 10
  putStr "\ndelay finished\n"
  conFileTime "localhost" "5009" file
  return(ExitSuccess)

NEXT connect

In development: feed control and management

  • [ ] control process
    • try to connect to Admin
    • if connection refused, logon
    • try again
    • limit attempts
    • admin listening to maintain connection
    • open stream data (port 5009)
    • issue instructions
    • logger
  • [ ] process monitor
    • [ ] timer
    • [ ] counts
  • [ ] history lookups
  • [ ] news information processing
  • [ ] error reporting
    • [ ] dodgy trades and quotes
    • [ ] specials
import Control.Concurrent
import Network
import System.Environment
import System.Process
import System.IO
import Control.Exception
import System.Exit
import Control.Monad (forever)
import Data.Time.Clock
import Data.Time.Format
import System.Locale
import Text.Regex.TDFA


conWrapped :: String -> String -> IO ()
conWrapped host port = do
    h <- try (connectTo host $ PortNumber $ toEnum $ read port) :: IO (Either SomeException Handle)
    case h of
      Left ex -> case () of _ 
                              | "connect: does not exist" =~ show ex  -> logon
                              | otherwise -> putStrLn $ "Caught Exception: " ++ show ex
 
      Right val -> hGetContents val >>= putStr
    return ()


conLogin :: String -> String -> IO ()
conLogin host port = do
    h <- try (connectTo host $ PortNumber $ toEnum $ read port) :: IO (Either SomeException Handle)
    case h of
      Left ex -> putStrLn $ "Caught Exception: " ++ show ex
      Right val -> hGetContents val >>= putStr
    return ()
    

con :: String -> String -> IO ()
con host port = do
    h <- connectTo host $ PortNumber $ toEnum $ read port
    hSetBuffering stdout LineBuffering
    hSetBuffering h      LineBuffering
    done <- newEmptyMVar

    _ <- forkIO $ (hGetContents h >>= putStr)
                `finally` tryPutMVar done ()

    _ <- forkIO $ (getContents >>= hPutStr h)
                `finally` tryPutMVar done ()

                -- Wait for at least one of the above threads to complete
    takeMVar done


conFileTime :: String -> String -> String -> IO ()
conFileTime host port file = do
    h <- connectTo host $ PortNumber $ toEnum $ read port
    f <- openFile file WriteMode
    hSetBuffering stdout LineBuffering
    hSetBuffering h      LineBuffering
    hSetBuffering f      LineBuffering
    done <- newEmptyMVar

    _ <- forkIO $ forever (do
                        t <- getCurrentTimeString
                        st <- hGetLine h
                        hPutStrLn f $ t ++ "," ++ st)
                `finally` tryPutMVar done ()

    _ <- forkIO $ (getContents >>= hPutStr h)
                `finally` tryPutMVar done ()

                -- Wait for at least one of the above threads to complete
    takeMVar done

conAdmin :: String -> IO ()
conAdmin cmds = do
  con "localhost" "9300"
  putStr cmds

conStream :: String -> IO ()
conStream cmds = do
  con "localhost" "5009"
  putStr cmds

conLookup :: String -> IO ()
conLookup cmds = do
  con "localhost" "9100"
  putStr cmds

logon :: IO ()
logon = do
  let cmd = "wine"
      args = ["Z:\\Users\\tonyday\\wine\\iqfeed\\iqconnect.exe", "-product IQFEED_DEMO -version 1"]
  _ <- rawSystem cmd args
  return()


getCurrentTimeString :: IO String
getCurrentTimeString = do
   now <- getCurrentTime
   return (formatTime defaultTimeLocale "%H:%M:%S%Q" now)


main :: IO ExitCode
main = do
  [file] <- getArgs
  -- _ <- forkIO (logon)
  -- threadDelay $ 1000000 * 6
  -- putStr "\ndelay finished\n"
  conFileTime "localhost" "5009" file
  return(ExitSuccess)

threading example

from http://www.haskell.org/haskellwiki/Background_thread_example

import Control.Monad
import Control.Concurrent
import Control.Exception as E
import Control.Concurrent.STM

type Work = IO ()

type SendWork = Work -> STM ()

spawnWorkers :: Int -> IO (SendWork,IO ())
spawnWorkers i | i <= 0 = error "Need positive number of workers"
               | otherwise = do
    workChan <- atomically newTChan
    runCount <- atomically (newTVar i)
    let stop = atomically (writeTVar runCount . pred =<< readTVar runCount)
        die e = do id <- myThreadId
                   print ("Thread "++show id++" died with exception "++show (e :: ErrorCall))
                   stop
        work = do mJob <- atomically (readTChan workChan)
                  case mJob of Nothing -> stop
                               Just job -> E.catch job die >> work
    replicateM_ i (forkIO work)
    let stopCommand = do atomically (replicateM_ i (writeTChan workChan Nothing))
                         atomically (do running <- readTVar runCount
                                        when (running>0) retry)
    return (writeTChan workChan . Just,stopCommand)

printJob :: Int -> IO ()
printJob i = do threadDelay (i*1000)
                id <- myThreadId
                print ("printJob took "++show i++" ms in thread "++show id)

main :: IO ()
main = do
  (submit,stop) <- spawnWorkers 10
  mapM_ (atomically . submit . printJob) (take 40 (cycle [100,200,300,400]))
  atomically $ submit (error "Boom")
  stop

latency research

I collected trade and order ticks for 12 contracts on 14th March from iqfeed, and timestamped each tick with current system time. There are two different potential points at which to measure latency:

  • iqfeed sends a ping every second, and
  • each quote has a relevant market timestamp to the millisecond

feed ping latency

From the raw iqfeed heartbeat:

t = read.csv("data/streamt.txt",header=FALSE,as.is=TRUE)
pingtime = strptime(t[,3], "%Y%m%d %H:%M:%S")
stamp = strptime(paste(strftime(pingtime,"%Y%m%d"), t[,1], sep=" "), "%Y%m%d %H:%M:%OS")    
latency = as.double(stamp - pingtime)
df = data.frame(pingtime=pingtime, latency=latency)
summary(df)
require(ggplot2)
qplot(data=df, x=pingtime, y=latency)
ggsave("ping-latency.svg")

data/ping-latency.svg

The simple scatterplot shows many negative values, especially when the market is open, and a step jump in the later pings (when no quotes were being recorded). These jumps may be due to changes in my system clock (automatic appletime resolutions) or due to a lack of accuracy in the iqfeed pings.

Scatterplots tend to provide dubious visualisation for bigdata, and a new package out that helps is bigvis.

bigvis is not yet available at CRAN but can be installed via a github repository (see https://github.com/hadley/bigvis for details).

install.packages("devtools")
devtools::install_github("bigvis")

bigvis doesn’t handle non-numeric data (like time), so rather than autopilot, I use ggplot directly.

require(bigvis)
require(ggplot2)
dfn = condense(bin(as.double(df$pingtime),60),bin(df$latency,.1))
dfg = data.frame(as.POSIXct(dfn[,1],origin="1960-01-01", tz="GMT"),dfn[,2],dfn[,3])
colnames(dfg) = c("Time","Latency","Count")
g = ggplot(data=dfg,aes(x=Time,y=Latency))
g + geom_tile(aes(fill=Count)) + scale_fill_gradient(low="#e5e5e5", high = "#444548") + scale_y_continuous(limits=c(-1,1))
ggsave("img/ping-latency-condensed.png")

img/ping-latency-condensed.png

Using the bigvis techniques clarifies a few main issues for further research:

  • there is a step jump near market open where the majority of the pings jump from around 250 msecs to -750 msecs. This looks like either a coding error or the ping being off by up to a second.
  • during market open (when tick volume is high) ping can vary by a second.

disconnects

Just looking at the ping counts after binning into one minute intervals:

df.dis = condense(bin(as.double(df$pingtime),60))
dfg = data.frame(as.POSIXct(df.dis[,1],origin="1960-01-01", tz="GMT"),60-df.dis[,2])
colnames(dfg) = c("Time","Count")
g = ggplot(data=dfg,aes(x=Time,y=Count))
g + geom_line(aes())
ggsave("img/disconnects.png")

img/disconnects.png

iqfeed regularly suffers from disconnects with reconnection occuring within a minute.

event latency

from the R database of the one day quote ticks…

  • open data
    rm(list = ls())
    require("mmap")
    require("rindex")
    require("plyr")
    require("stringr")
    raw.stream = "streamqh"
    # where the mmap db is located
    db.path = paste("data/",raw.stream,"/",sep="")
    
    load(paste(db.path,".Rdbinfo",sep=""))
    #m = mmap(main.filename, mode=st)
    stream = NULL
    stream$stamp = mmap(paste(db.path,fields[1],".data",sep=""), mode=double())
    stream$code = mmap(paste(db.path,fields[2],".data",sep=""), mode=char(1))
    stream$symbol = mmap(paste(db.path,fields[3],".data",sep=""), mode=char(ticker.length))
    stream$trade = mmap(paste(db.path,fields[4],".data",sep=""), mode=double())
    stream$vol = mmap(paste(db.path,fields[5],".data",sep=""), mode=integer())
    stream$tradetime = mmap(paste(db.path,fields[6],".data",sep=""), mode=double())
    stream$tradeex = mmap(paste(db.path,fields[7],".data",sep=""), mode=double())
    stream$volex = mmap(paste(db.path,fields[8],".data",sep=""), mode=integer())
    stream$tradetimeex = mmap(paste(db.path,fields[9],".data",sep=""), mode=double())
    stream$voltot = mmap(paste(db.path,fields[10],".data",sep=""), mode=integer())
    stream$bid = mmap(paste(db.path,fields[11],".data",sep=""), mode=double())
    stream$bidvol = mmap(paste(db.path,fields[12],".data",sep=""), mode=integer())
    stream$bidtime = mmap(paste(db.path,fields[13],".data",sep=""), mode=double())
    stream$ask = mmap(paste(db.path,fields[14],".data",sep=""), mode=double())
    stream$askvol = mmap(paste(db.path,fields[15],".data",sep=""), mode=integer())
    stream$asktime = mmap(paste(db.path,fields[16],".data",sep=""), mode=double())
    stream$event = mmap(paste(db.path,fields[17],".data",sep=""), mode=char(12))
    stream$id = mmap(paste(db.path,fields[18],".data",sep=""), mode=integer())
    
        
  • Define events and extract relevant times
    n = length(stream$event[])
    
    tC = grepl("C",stream$event[])
    tO = grepl("O",stream$event[])
    ta = grepl("a",stream$event[])
    tb = grepl("b",stream$event[])
    ta = ta & !(tC | tO)
    tb = tb & !(tC | tO | ta)
    tother = !(ta | tb | tC | tO)
    
    event.category = (1 * tC) + (2 * tO) + (3 * ta) + (4 * tb) + (5 * tother)
    
    event.time = (stream$tradetime[] * tC +
            stream$tradetimeex[] * tO +
            stream$asktime[] * ta +
            stream$bidtime[] * tb +
            stream$tradetime[] * tother)
    
    event.time.posix = as.POSIXct(event.time,origin="1960-01-01", tz="GMT")
    event.stamp = stream$stamp[]
    
    event.latency = event.stamp - event.time  
    
    event.df = data.frame(symbol=stream$symbol[],event.category,event.time, event.stamp, event.latency)
    summary(event.df)
        
  • bigvis manipulations
    require("bigvis")
    require("ggplot2")
    df1 = condense(bin(event.df$event.time,60),bin(event.df$event.latency,0.05))
    df2 = df1[(df1$event.df.event.latency > 0) & (df1$event.df.event.latency < 1),]   
    dfg = data.frame(as.POSIXct(df2[,1]+10*60*60,origin="1960-01-01", tz=""),df2[,2],df2[,3])
    colnames(dfg) = c("Time","Latency","Count")
    g = ggplot(data=dfg,aes(x=Time,y=Latency))
    g + geom_tile(aes(fill=Count)) + scale_fill_gradient(low="#e5e5e5", high = "#444548") + scale_y_continuous(limits=c(-1,1))
    ggsave("img/quote-latency-condensed.svg")
    
        

    img/quote-latency-condensed.svg

    Unlike the iqfeed ping, there is a consistent latency pattern when comparing market stamp and local system stamp, with no spurious negative values.

  • symbols
    summary(as.factor(stream$symbol[]))
        
  • emini latency
    ind.emini = indexEQ(ind.symbol,"@ESM13 ")
    df1 = condense(bin(event.df$event.time[ind.emini],600),bin(event.df$event.latency[ind.emini],0.05))
    df2 = df1[(df1$event.df.event.latency > -1) & (df1$event.df.event.latency < 10),]   
    dfg = data.frame(as.POSIXct(df2[,1]+10*60*60,origin="1960-01-01", tz=""),df2[,2],df2[,3])
    colnames(dfg) = c("Time","Latency","Count")
    g = ggplot(data=dfg,aes(x=Time,y=Latency))
    g + geom_tile(aes(fill=Count)) + scale_fill_gradient(low="#e5e5e5", high = "#444548") + scale_y_continuous(limits=c(-1,1))
    ggsave("img/quote-latency-condensed-emini.svg")
        
  • average latency (with binning)
    require(ggplot2)
    require(bigvis)
    ind.emini = indexEQ(ind.symbol,"@ESM13 ")
    df1 = condense(bin(event.df$event.time[ind.emini],300,name="time"),bin(event.df$event.latency[ind.emini],0.05,name="latency"))
    df2 = df1[(df1$latency > 0) & (df1$latency < 2),]
    lat.av = tapply(df2$latency*df2$.count,df2$time,sum)/tapply(df2$.count,df2$time,sum)
    dfg = data.frame(Time=as.POSIXct(as.double(row.names(lat.av))+10*60*60,origin="1960-01-01", tz=""),Latency=lat.av)
    #colnames(dfg) = c("Time","Latency","Count")
    g = ggplot(data=dfg,aes(x=Time,y=Latency))
    g + geom_point()
    ggsave("img/quote-latency-averagecondensed.svg")
        

latency research

feed ping latency

From the raw iqfeed heartbeat:

t = read.csv("data/streamt.txt",header=FALSE,as.is=TRUE)
pingtime = strptime(t[,3], "%Y%m%d %H:%M:%S")
stamp = strptime(paste(strftime(pingtime,"%Y%m%d"), t[,1], sep=" "), "%Y%m%d %H:%M:%OS")    
latency = as.double(stamp - pingtime)
df = data.frame(pingtime=pingtime, latency=latency)
summary(df)
require(ggplot2)
qplot(data=df, x=pingtime, y=latency)
ggsave("img/ping-latency.svg")

img/ping-latency.svg

install.packages("devtools")
devtools::install_github("bigvis")

http://vita.had.co.nz/papers/bigvis.html

  • switch to numeric
  • 2d bin
dfn = condense(bin(as.double(df$pingtime),60),bin(df$latency,.1))
dfg = data.frame(as.POSIXct(dfn[,1],origin="1960-01-01", tz="GMT"),dfn[,2],dfn[,3])
colnames(dfg) = c("Time","Latency","Count")
g = ggplot(data=dfg,aes(x=Time,y=Latency))
g + geom_tile(aes(fill=Count)) + scale_fill_gradient(low="#e5e5e5", high = "#444548")
ggsave("ping-latency-condensed.svg")

event latency

from the R database

  1. open data
    rm(list = ls())
    require("mmap")
    require("rindex")
    require("plyr")
    require("stringr")
    raw.stream = "streamqh"
    # where the mmap db is located
    db.path = paste("data/",raw.stream,"/",sep="")
    
    load(paste(db.path,".Rdbinfo",sep=""))
    #m = mmap(main.filename, mode=st)
    stream = NULL
    stream$stamp = mmap(paste(db.path,fields[1],".data",sep=""), mode=double())
    stream$code = mmap(paste(db.path,fields[2],".data",sep=""), mode=char(1))
    stream$symbol = mmap(paste(db.path,fields[3],".data",sep=""), mode=char(ticker.length))
    stream$trade = mmap(paste(db.path,fields[4],".data",sep=""), mode=double())
    stream$vol = mmap(paste(db.path,fields[5],".data",sep=""), mode=integer())
    stream$tradetime = mmap(paste(db.path,fields[6],".data",sep=""), mode=double())
    stream$tradeex = mmap(paste(db.path,fields[7],".data",sep=""), mode=double())
    stream$volex = mmap(paste(db.path,fields[8],".data",sep=""), mode=integer())
    stream$tradetimeex = mmap(paste(db.path,fields[9],".data",sep=""), mode=double())
    stream$voltot = mmap(paste(db.path,fields[10],".data",sep=""), mode=integer())
    stream$bid = mmap(paste(db.path,fields[11],".data",sep=""), mode=double())
    stream$bidvol = mmap(paste(db.path,fields[12],".data",sep=""), mode=integer())
    stream$bidtime = mmap(paste(db.path,fields[13],".data",sep=""), mode=double())
    stream$ask = mmap(paste(db.path,fields[14],".data",sep=""), mode=double())
    stream$askvol = mmap(paste(db.path,fields[15],".data",sep=""), mode=integer())
    stream$asktime = mmap(paste(db.path,fields[16],".data",sep=""), mode=double())
    stream$event = mmap(paste(db.path,fields[17],".data",sep=""), mode=char(12))
    stream$id = mmap(paste(db.path,fields[18],".data",sep=""), mode=integer())
    
        
  1. Define events and extract relevant times
    n = length(stream$event[])
    
    tC = grepl("C",stream$event[])
    tO = grepl("O",stream$event[])
    ta = grepl("a",stream$event[])
    tb = grepl("b",stream$event[])
    ta = ta & !(tC | tO)
    tb = tb & !(tC | tO | ta)
    tother = !(ta | tb | tC | tO)
    
    event.category = (1 * tC) + (2 * tO) + (3 * ta) + (4 * tb) + (5 * tother)
    
    event.time = (stream$tradetime[] * tC +
            stream$tradetimeex[] * tO +
            stream$asktime[] * ta +
            stream$bidtime[] * tb +
            stream$tradetime[] * tother)
    
    event.time.posix = as.POSIXct(event.time,origin="1960-01-01", tz="GMT")
    event.stamp = stream$stamp[]
    
    event.latency = event.stamp - event.time  
    
    event.df = data.frame(symbol=stream$symbol[],event.category,event.time, event.stamp, event.latency)
    summary(event.df)
        
  1. bigvis manipulations
    require("bigvis")
    require("ggplot2")
    df1 = condense(bin(event.df$event.time,60),bin(event.df$event.latency,0.05))
    df2 = df1[(df1$event.df.event.latency > 0) & (df1$event.df.event.latency < 1),]   
    dfg = data.frame(as.POSIXct(df2[,1]+10*60*60,origin="1960-01-01", tz=""),df2[,2],df2[,3])
    colnames(dfg) = c("Time","Latency","Count")
    g = ggplot(data=dfg,aes(x=Time,y=Latency))
    g + geom_tile(aes(fill=Count)) + scale_fill_gradient(low="#e5e5e5", high = "#444548")
    ggsave("quote-latency-condensed.svg")
    
        
  1. symbols
    summary(as.factor(stream$symbol[]))
        
  2. emini latency
    ind.emini = indexEQ(ind.symbol,"@ESM13 ")
    df1 = condense(bin(event.df$event.time[ind.emini,],60),bin(event.df$event.latency[ind.emini,],0.05))
    df2 = df1[(df1$event.df.event.latency > -1) & (df1$event.df.event.latency < 10),]   
    dfg = data.frame(as.POSIXct(df2[,1]+10*60*60,origin="1960-01-01", tz=""),df2[,2],df2[,3])
    colnames(dfg) = c("Time","Latency","Count")
    g = ggplot(data=dfg,aes(x=Time,y=Latency))
    g + geom_tile(aes(fill=Count)) + scale_fill_gradient(low="#e5e5e5", high = "#444548")
    ggsave("quote-latency-condensed-emini.svg")
    
        

R database

Experimenting with the mmap package in R, using this as a roll-your-own column database.

Starting with the raw market event stream:

basic analytics

  • Count Code Types
    require("hash")
    #inFile = "data/stream.100k.txt" 
    inFile = "data/data.all.out.txt" 
    inCon = file(inFile, open = "r")  
    h <- hash()
      
    while (length(lines <- readLines(inCon, n=200, warn=FALSE)) > 0) {
      s = strsplit(lines,",")
      for (x1 in 1:length(s)) {
        c = s[[x1]][2]
        if (has.key(c,h)) {
          h[[c]] = h[[c]] + 1
        } else {
          h[[c]] = 1
        }
      }
    }
      
        
    h
        

stream to mmap

makedb

#head -n 100 streamq.100k.txt > streamq.100.txt
cat header.txt streamq.100k.txt > streamqh.100k.txt

libraries

rm(list = ls())
require("mmap")
require("rindex")
require("plyr")
require(stringr)

variables

# stream with field header
raw.stream = "streamqh"
# where the mmap db will be located
db.path = paste("data/",raw.stream,"/",sep="")
# mmap of the entire row
main.filename = paste("data/",raw.stream,"/main.data",sep="")
# file containing the raw feed
file.csv.data = paste("data/",raw.stream,".txt",sep="")
# maximum character length of the event field
event.size = 12
# maximum character length of the id field
id.size = 12

slurp in raw data (mmap)

mmap.csv was difficult to work with when there were blanks entries. These translated as NA when slurped up by read.table which is a logical type and thus not supported by mmap.

Once past this hurdle, adhoc analysis of the larger data set is painless despite size issues.

  • mmap.csv hack
      
    my.mmap.csv = function(file,
      file.mmap = NA,
      header = TRUE, 
      sep = ",", 
      quote = "\"", 
      dec = ".", 
      fill = TRUE, 
      comment.char = "", 
      row.names,
      actualColClasses = NA,
      ...)
    {
        ncols <- length(gregexpr(sep, readLines(file, 1))[[1]]) + 
            1
        mcsv <- tempfile()
        tmplist <- vector("list", ncols)
        cnames <- character(ncols)
        if (!missing(row.names) && is.numeric(row.names) && length(row.names) == 
            1L) 
            ncols <- ncols - 1
        for (col in 1:ncols) {
            colclasses <- rep("NULL", ncols)
            if (!missing(actualColClasses)) {
              colclasses[col] <- actualColClasses[col]
            } else {
              colclasses[col] <- NA
            } 
            clm <- read.table(file = file, header = header, sep = sep, 
                quote = quote, dec = dec, fill = fill, comment.char = comment.char, 
                colClasses = colclasses, stringsAsFactors = FALSE, 
                row.names = row.names, ...)
            cnames[col] <- colnames(clm)
            tmplist[[col]] <- as.mmap(clm[, 1], force = TRUE)
        }
        stype <- do.call(struct, lapply(tmplist, function(X) X$storage.mode))
        totalsize <- sum(sapply(tmplist, nbytes))
        if (is.na(file.mmap)) {
          tmpstruct <- tempfile()
        } else {
          tmpstruct = file.mmap
        }
        writeBin(raw(totalsize), tmpstruct)
        tmpstruct <- mmap(tmpstruct, stype)
        for (col in 1:ncols) {
            tmpstruct[, col] <- tmplist[[col]][]
        }
        colnames(tmpstruct) <- cnames
        extractFUN(tmpstruct) <- as.data.frame
        tmpstruct
    }
    
        
  • store mmap’ed raw stream in m
    dir.create(db.path)
    
    colclasses = as.vector(c("character", "character", "character", "numeric", "integer", "character",
      "numeric", "integer", "character", "integer", "numeric", "integer", "character",
      "numeric", "integer", "character", "character", "integer", "character", "character","character"))
    
    m = my.mmap.csv(file=file.csv.data, file.mmap=main.filename, header=TRUE, actualColClasses=colclasses)
    head(m)
    st = m$storage.mode
    ticker.length =  nbytes(st$Symbol) - 1
        

fields

colnames(m[])

conversion to column db

create mmaps for each column
stream = NULL
stream$stamp = as.mmap(as.double(strptime(m[]$Stamp, "%H:%M:%OS",tz="GMT")),file=paste(db.path,"stamp.data",sep=""), mode=double())
stream$code = as.mmap(as.character(m[]$Code),file=paste(db.path,"code.data",sep=""), mode=char(1))
stream$symbol = as.mmap(as.character(m[]$Symbol),file=paste(db.path,"symbol.data",sep=""), mode=char(ticker.length))
stream$trade = as.mmap(m[]$Most.Recent.Trade,file=paste(db.path,"trade.data",sep=""), mode=double())
stream$vol = as.mmap(m[]$Most.Recent.Trade.Size,file=paste(db.path,"vol.data",sep=""), mode=integer())
stream$tradetime = as.mmap(as.double(strptime(as.character(m[]$Most.Recent.Trade.TimeMS), "%H:%M:%OS",tz="GMT")),file=paste(db.path,"tradetime.data",sep=""), mode=double())
stream$tradeex = as.mmap(m[]$Extended.Trade,file=paste(db.path,"tradeex.data",sep=""), mode=double())
stream$volex = as.mmap(m[]$Extended.Trade.Size,file=paste(db.path,"volex.data",sep=""), mode=integer())
stream$tradetimeex = as.mmap(as.double(strptime(as.character(m[]$Extended.Trade.TimeMS), "%H:%M:%OS",tz="GMT")),file=paste(db.path,"tradetimeex.data",sep=""), mode=double())
stream$voltot = as.mmap(m[]$Total.Volume,file=paste(db.path,"voltot.data",sep=""), mode=integer())
stream$bid = as.mmap(m[]$Bid,file=paste(db.path,"bid.data",sep=""), mode=double())
stream$bidvol = as.mmap(m[]$Bid.Size,file=paste(db.path,"bidvol.data",sep=""), mode=integer())
stream$bidtime = as.mmap(as.double(strptime(as.character(m[]$Bid.TimeMS), "%H:%M:%OS",tz="GMT")),file=paste(db.path,"bidtime.data",sep=""), mode=double())
stream$ask = as.mmap(m[]$Ask,file=paste(db.path,"ask.data",sep=""), mode=double())
stream$askvol = as.mmap(m[]$Ask.Size,file=paste(db.path,"askvol.data",sep=""), mode=integer())
stream$asktime = as.mmap(as.double(strptime(as.character(m[]$Ask.TimeMS), "%H:%M:%OS",tz="GMT")),file=paste(db.path,"asktime.data",sep=""), mode=double())
stream$event = as.mmap( str_pad(as.character(m[]$Message.Contents), event.size, side = "right", pad = " "),file=paste(db.path,"event.data",sep=""), mode=char(event.size))
stream$id = as.mmap(m[]$TickID,file=paste(db.path,"id.data",sep=""), mode=integer())
create indices using rindex
require(rindex)
ind.stamp = index(as.character(stream$stamp[]))
ind.symbol = index(stream$symbol[])
ind.event = index(stream$event[])
ind.id = index(str_pad(as.character(stream$id[]), id.size, side = "left", pad = " "))
save indexes
fields = names(stream)
save(list = c("ind.stamp",
       "ind.symbol",
       "ind.event",
       "ind.id",
       "fields",
       "main.filename",
       "st",
       "ticker.length",
       "event.size",
       "id.size"
       ),
     file = paste(db.path,".Rdbinfo",sep=""))

reboot

rm(list = ls())
require("mmap")
require("rindex")
require("plyr")
require("stringr")
raw.stream = "streamqh"
# where the mmap db is located
db.path = paste("data/",raw.stream,"/",sep="")

load(paste(db.path,".Rdbinfo",sep=""))
#m = mmap(main.filename, mode=st)
stream = NULL
stream$stamp = mmap(paste(db.path,fields[1],".data",sep=""), mode=double())
stream$code = mmap(paste(db.path,fields[2],".data",sep=""), mode=char(1))
stream$symbol = mmap(paste(db.path,fields[3],".data",sep=""), mode=char(ticker.length))
stream$trade = mmap(paste(db.path,fields[4],".data",sep=""), mode=double())
stream$vol = mmap(paste(db.path,fields[5],".data",sep=""), mode=integer())
stream$tradetime = mmap(paste(db.path,fields[6],".data",sep=""), mode=double())
stream$tradeex = mmap(paste(db.path,fields[7],".data",sep=""), mode=double())
stream$volex = mmap(paste(db.path,fields[8],".data",sep=""), mode=integer())
stream$tradetimeex = mmap(paste(db.path,fields[9],".data",sep=""), mode=double())
stream$voltot = mmap(paste(db.path,fields[10],".data",sep=""), mode=integer())
stream$bid = mmap(paste(db.path,fields[11],".data",sep=""), mode=double())
stream$bidvol = mmap(paste(db.path,fields[12],".data",sep=""), mode=integer())
stream$bidtime = mmap(paste(db.path,fields[13],".data",sep=""), mode=double())
stream$ask = mmap(paste(db.path,fields[14],".data",sep=""), mode=double())
stream$askvol = mmap(paste(db.path,fields[15],".data",sep=""), mode=integer())
stream$asktime = mmap(paste(db.path,fields[16],".data",sep=""), mode=double())
stream$event = mmap(paste(db.path,fields[17],".data",sep=""), mode=char(12))
stream$id = mmap(paste(db.path,fields[18],".data",sep=""), mode=integer())
#save(list = ls(all=TRUE), file = paste(db.path,".Rsnap"))

stream -> price vector

event codes

codemeaning
EExtended Trade = Form T trade
OOther Trade = Any trade not accounted for by C or E
bA bid update occurred
aAn ask update occurred
oAn Open occurred
hA High occurred
lA Low occurred
cA Close occurred
sA Settlement occurred
library(ascii)
options(asciiType="org")
ascii(summary(as.factor(stream$event[])),header=T,include.colnames=T)

symbol event table

e = unique(as.factor(stream$event[]))
s = unique(as.factor(stream$symbol[]))
symbol.event.count = data.frame(array(NA,c(length(e),length(s))),row.names=e)
colnames(symbol.event.count) = s

for(x1 in 1:length(e)) {
  for(x2 in 1:length(s)) {
    symbol.event.count[x1,x2] = sum(stream$symbol[][stream$event[]==e[x1]]==s[x2])  
  }
}

symbol.event.count
@JYM13@YMM13EBK13EBM13@NQM13@EDM13@EDU13@ESM13+SK13@USNM13@F1M13@N1M13@T1M13
a512080547122434049629305681197912211515911
b51207505670946894275790723101443108547741
ba288595003075875437230131
C137481722046421032482944107691000
O005186970551790500000
Cohl0000000001000

one symbol processing of trades

n = length(stream$event[])
ts = indexEQ(ind.symbol,"@ESM13 ")
tt = grep("C|O",stream$event[])
tC = grep("C",stream$event[])
tO = grep("O",stream$event[])

trades = intersect(ts,tt)

price = (stream$trade[][trades] * grepl("C",stream$event[][trades])) + 
         stream$tradeex[][trades] * grepl("O",stream$event[][trades])
vol = (stream$vol[][trades] * grepl("C",stream$event[][trades])) + 
         stream$volex[][trades] * grepl("O",stream$event[][trades])
id = stream$id[][trades]

time = (stream$tradetime[][trades] * grepl("C",stream$event[][trades])) + 
         stream$tradetimeex[][trades] * grepl("O",stream$event[][trades])

time.posix = as.POSIXct(time,origin="1960-01-01", tz="GMT")
stamp = stream$stamp[][trades]

voltot = stream$voltot[][trades]

df = data.frame(price,vol,time.posix, voltot, stamp, id)
summary(df)

checks

  • id sequence
  • time sequence
  • stamp sequence
  • voltot = voltot + vol
sum(0 >= diff(df$id))
sum(0 > diff(df$time))
sum(0 >= diff(df$stamp))
sum(df$vol[-1] != diff(df$voltot))

voltot crosstable

e = unique(as.factor(stream$event[]))
s = unique(as.factor(stream$symbol[]))
symbol.event.stat = data.frame(array(NA,c(length(e),length(s))),row.names=e)
colnames(symbol.event.stat) = s

for(x1 in 1:length(e)) {
  for(x2 in 1:length(s)) {
    symbol.event.stat[x1,x2] = mean(stream$voltot[][stream$event[]==e[x1]]==s[x2])  
  }
}

symbol.event.stat
@JYM13@YMM13EBK13EBM13@NQM13@EDM13@EDU13@ESM13+SK13@USNM13@F1M13@N1M13@T1M13
a0000000000000
b0000000000000
ba0000000000000
C0000000000000
O0000000000000
Cohl0000000000000

price, vol and time plot

require(ggplot2)
g = ggplot(df, aes(x=time.posix,y=price,size=vol))
  g + geom_point()

reference

http://www.rinfinance.com/agenda/2012/talk/JeffRyan.pdf

Useless Factor: Why not mmap?

‎www.rinfinance.com/agenda/2012/talk/JeffRyan.pdf

‎cran.r-project.org/web/packages/mmap/mmap.pdf

r - mmap and csv files - Stack Overflow

lubridate/R at master · hadley/lubridate · GitHub

Rmetrics - Update price data on disk using mmap package

Sigmoid function - Wikipedia, the free encyclopedia

http://www-stat.stanford.edu/~tibs/ElemStatLearn/

http://www.bnosac.be/index.php/blog/26-massive-online-data-stream-mining-with-r

NEXT algo

There’s a few hypotheses guilding the evolution of the algorithm design:

  • whatever the complexity of an algorithm, the outcome can usually be represented as a distributional forecast of a security price.
  • specifically, this means that pairs (and other relative value) trades should be thought of as a (potentially) strong relationship between two securities rather than a forecast of relative value between two securities. With HFT, it becomes problematic when there is an implicit assumption that two trades can occur simultaneously and with certainty.
  • that a comparative advantage of the HFT project is in algorithm design rather than raw speed.

algo soup

A major part of this project is to invent a generic algorithm that encapsulates most other algorithmic design ideas.

To start somewhere, the steps below represent a thought experiment on a different approach to standard practices using pseudo-code.

step 1: boiler-plate trend following

The most common technical analysis in the world is to extract a trading signal using the difference between a short-run and long-run moving average of price:

n.short = 100000 n.long = 1000000 signal = sum(old.data.stream[1::n.long]) / n.long - sum(old.data.stream[1::n.short]) / n.short

step 2: generalising this pattern

Both the long and the short ma are calculated on the old.data.stream and are just different weighting schemes.

weight = matrix(1/n.long, 1, n.long) - matrix (1/n.short,1,n.short) signal = sum(weight * old.data.stream[1::nlong])

step 3: formulating the weighting scheme

The weighting scheme:

  • can be much more general than restricted to two simple moving averages
  • can represent a large subset of technical trading rules
  • artificially weights data that is one million ticks old the same as the last tick.

So,

better.weighting.scheme = prior.smooth.curve.declining.to.zero optimise(better.weighting.scheme) signal = sum(better.weighting.scheme * old.data.stream[1::n.long])

step 4: convert to a vector method

convert the signal to a vector method involving old values of itself (this is a tricky bit)

signal[0] = some.function(tricky.weighting.scheme, signal[1:nlong], old.data.stream[1:n.long])

Remember that old signal values are also just weighted sums of old.data.stream, so working out the tricky.weighting.scheme is pretty much just tricky diff algebra.

It also means that some.function looks pretty similar to just a sum (but there might be some non-linearity there)

step 5: reorient towards the event flow

Find that the importance of older old.data.stream values decays very quickly once you start to use past signal values. So quickly, in fact, that old.data.stream[3] (say) has a fairly low weight even! The importance of signal[1:n.long] decays less quickly.

So the signal calculation becomes:

signal[0] = sum(tricky.weighting.scheme, signal[1:n.not.so.long], old.data.stream[1:3])

step 6: iterate

Iterate steps 3,4 & 5 on the signal (which is itself now an algerbaic iteration of a weighting scheme), each time coming up with a new weighting scheme that reduces the dependence on old data.

The new weighting scheme can also be thought of as a new signal.

tricky.weights[0]=tricky.weighting.scheme signals[0] = signal for (x1=1:20) tricky.weights[x1] = optimise(minimise(n.not.so.long), signals[0:(x1-1)],tricky.weights[0:(x1-1)]) signals[x1] = some.function(signals, tricky.weights) end for

Now step 6 is the very tricky bit I havent fully thought through. I doubt a c loop will work for example. And I’m probably trying to reinvent something that already exists, like a principle component method or something. But what I think this process converges to is this:

signals[0] = some.linear.function(signals[1], old.data.stream[0]) => signals = funk(signals, linear.signal.functions, data.stream)

Finally …

There are a few other narratives which support this:

The end result looks very neat from a mathematical and computational point-of-view. This is the way the world is supposed to look, with an event stream, an information state (the signals) and an algorithm that relates event to change in state.

Having done all the tricky math, it’s easy to relate the original algorithm to the final result. In the original method: - there are 1,000,001 signals: 1,000,000 being the last one million prices and 1 being the calculated signal. - there is 1 linear.function, the dual ma crossover. Or you could think about it as 3 signals (long ma, short ma and the difference) in addition to the old.data.

The whole exercise could be thought of a combined compression and optimisation computation. Some parts of the story are simply about compressing the market data so it can compactly sit on the event stream. The algorithm needs to be transformed given it needs to operate on compressed data but it should spit out about the same answer (or there will be a speed - accuracy tradeoff). But some parts of the story are about looking at the algorithm in the light of getting it on the stream and thinking about how to do it better.

the point

The general point to an algo soup is to insert the algorithms directly into the event streamin the event stream as part of an actor framework. In other words, algorithms in the stream (functions/transforms/state-variables) shouldn’t use stream history to recompute themselves. They can only use the stream and other algorithms in the stream.

Consider the original algorithm (a simple moving average). Most algorithms recalculate every time so they look at the last million or so ticks (data events) every time they update themselves. Some might be smart enough to add the latest data and drop off old values. But a true streaming algorithm doesnt have to remember old values and can use an exponential decay method to keep track of the moving average. Thus the moving average algorithm becomes a state variable in the stream. And it also goes from being a summation of a million values to being a few bit shifts ;)

pseudo summary

better.weighting.scheme = prior.smooth.curve.declining.to.zero
optimise(better.weighting.scheme)
signal = sum(better.weighting.scheme * old.data.stream[1::n.long])
signal[0] = some.function(tricky.weighting.scheme, signal[1:nlong], old.data.stream[1:n.long])
signal[0] = sum(tricky.weighting.scheme, signal[1:n.not.so.long], old.data.stream[1:3])
tricky.weights[0]=tricky.weighting.scheme
signals[0] = signal
for (x1=1:20)
   tricky.weights[x1] = optimise(minimise(n.not.so.long), signals[0:(x1-1)],tricky.weights[0:(x1-1)])
   signals[x1] = some.function(signals, tricky.weights)
end for
signals[0] = some.linear.function(signals[1], old.data.stream[0])

=> signals = funk(signals, linear.signal.functions, data.stream)  

weighted stats

statistical basics

  • [ ] introduce the notion of event time (modifying base statistics for trade volume, trade block size and time bursts)

moment calcs

picks up from the df created here

require(moments)
source("R/functions.moments.R")
num.quantiles=5
length.ma=100
wmean=t(rep(1,length.ma)/length.ma)
wvol=wmean
ret = diff(df$price)

mhist=weighted.historicals(ret,list(mean=wmean,vol=wvol))

qs.hmean=quantile(t(mhist$hmean), probs = (1/num.quantiles)*(1:num.quantiles))
qs.hvol=quantile(t(mhist$hvol), probs = (1/num.quantiles)*(1:num.quantiles))
qts.hmean=quantile.ts(mhist$hmean, qs.hmean)
qts.hvol=quantile.ts(mhist$hvol, qs.hvol)

dfg = cbind(times=df$time[-1],ret,mhist,qts.hmean,qts.hvol)

g = ggplot(dfg,aes(x=times,y=hmean,color=hvol))
g = g + geom_point(aes(group=1))
g
str(dfg)

signal calc

delay.signal=1
sig=c(as.logical(rep(0,delay.signal)),
  dfg$hmean[1:(dim(dfg)[1]-delay.signal)])
qs.sig=quantile(sig, seq(0.01,1,0.01))
dfg$qts.sig=quantile.ts(sig,qs.sig)
dfg$pos=as.numeric(dfg$qts.sig>27)
summary(dfg$pos)  

stats

meanstdsharpeskewnesskurtosis
ret-4.45439854038269e-070.0639281360775152-6.96782170370425e-060.00055370337481512215.4881656403179
hmean-1.87084738696073e-070.00152524040456262-0.0001226591808979260.005169807722486813.49203639731598
hvol0.05215472164593950.03640808185107181.432503965995240.1610455586101942.30959331703425
qts.hmean0.6914366860700271.512503265949290.4571472350746031.730329459993243.99404004012053
qts.hvol1.999527833754721.41468019818561.4134133186562-0.0001132492348676581.69941490792223
qts.sig25.368145348806126.85594297816480.9446008047243671.504340874319323.72252342870455
pos0.1728591715175070.3781258164873230.4571472350746031.730329459993243.99404004012053

bucket stats

bcount.1bcount.2bcount.3bcount.4bcount.5br.1br.2br.3br.4br.5babsr.1babsr.2babsr.3babsr.4babsr.5bsqr.1bsqr.2bsqr.3bsqr.4bsqr.5bstd.1bstd.2bstd.3bstd.4bstd.5
11124989070892240829788580100.004820412753009660.003290329575021680.007206729494564820.009443363130965840.003564507813472240.0108231908982670.0130149609713790.01927016799633640.03072516637335230.000891126953368060.002709931869294880.003255095403295750.004820554845862760.007692946469155370.02985188226406790.05183362913862030.0569587948170130.06905559618892430.0872001031189334
20000000000000000000000000
30000000000000000000000000
40000000000000000000000000
50212962001329263264440-0.020203324567994-0.0153275371008844-0.0205037077538188-0.030706398426864300.0207433320811420.0175011242692250.02284454772237980.03335350173952500.005191702667167540.004381527007445160.005732494959505180.0083667372560883400.06916468011271150.06439565838799730.07288535150076230.0861634208992548

std chart

require(reshape)
require(scales)
vol.cats=c("low vol", "low-mid vol", "mid vol", "high-mid vol", "high vol")
ret.cats=c("low ret", "low-mid ret", "mid ret", "high-mid ret", "high ret")
data = b$br
which.name = "return"
colnames(data)=vol.cats
rownames(data)=ret.cats
data.m = melt(data)
colnames(data.m) = c("return.level","volatility.level", which.name)
data.m$return.level = factor(data.m$return.level,levels=ret.cats)
data.m$volatility.level = factor(data.m$volatility.level,levels=vol.cats)
data.m <- transform(data.m, rescale = rescale(data.m[which.name]))
p <- ggplot(data.m, aes(return.level, volatility.level)) + geom_tile(aes(fill = return), 
                                                  colour =   "white") 
p + scale_fill_gradient(low = "white", high = "steelblue", space="Lab")
p
ggsave("pricevolheat.svg")

weighting function

takes a sum of exponential curves maxn is total length of series (from time -1 to time -maxn) slope is a parameter effecting steepness (0 is flat) delay is delaying the curve by x time units

so slope=100, delay = 0 is yesterday slope = 0, delay = 0 is a simple ma

weighted.curve = function(maxn,slope,weight,delay) {
  c = rep(0,maxn)
  for (x1 in 1:length(slope)) {
    if (slope[x1] == 0) {
      c = c + weight[x1] * 
        c(rep(0,delay[x1]),
          rep(1/maxn,maxn-delay[x1]))
    } else {
      c = c + weight[x1] * 
        c(rep(0,delay[x1]),
          exp((-1:(delay[x1]-maxn))*slope[x1])*(exp(slope[x1])-1))
    }
  }
  weighted.curve = c / sum(c)
}
require(ggplot2)
maxn = 100
slope = c(0.01,0,.1)
weight = c(0.3,0.3,0.4)
delay = c(0,10,0)
t1 = weighted.curve(maxn,slope,weight,delay)
df = data.frame(t1,time=1:100)
g2 = ggplot(df,aes(y=t1,x=time))
g2 = g2 + geom_point(aes(group=1))
g2 = g2 + ylim(0,max(t1))
g2
sum(t1)

old school

x is a vector representing slope, delay and weight

 require(nloptr)
 require(moments)
 source("R/functions.moments.R")
 delay.signal=1
 #ret = diff(log(trade.price[]),1)    
 test.funk = function(x,ret) {
   length.ma=as.integer(x[2])
   wmean = weighted.curve(length.ma,x[3:5],x[6:8],as.integer(x[9:11]))
   wvol= weighted.curve(length.ma,x[12:14],x[15:17],as.integer(x[18:20]))
   mhist=weighted.historicals(ret,list(mean=wmean,vol=wvol))  
   sig.mean=c(as.logical(rep(0,delay.signal)),
     mhist$hmean[1:(length(mhist$hmean)-delay.signal)])
   sig.vol=c(as.logical(rep(0,delay.signal)),
     mhist$hvol[1:(length(mhist$hmean)-delay.signal)])
   pos=sig.mean>x[21] & sig.vol<x[22]
   test.funk=-sum(ret*pos)
 }
 
minx = c(1,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-0.01,0)
initx = c(1,10,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,-0.01,0.01)
maxx = c(1,100,1,1,1,1,1,1,0,0,0,1,1,1,1,1,1,0,0,0,0.01,0.05)

t1 = test.funk(initx,ret)
funk=function (x) {-test.funk(x,ret)}

local_opts <- list("algorithm" = "NLOPT_LN_BOBYQA",
                   "xtol_rel"  = 1.0e-5,
                   "max_time" = 5)
opts <- list("algorithm" = "NLOPT_GN_MLSL",
             "xtol_rel"  = 1.0e-5,
             "maxeval"   = 1000000,
             "maxtime" = 60,
             "local_opts" = local_opts )


res = nloptr(initx, 
       funk, 
       eval_grad_f = NULL,
       lb = minx, 
       ub = maxx, 
       eval_g_ineq = NULL, 
       eval_jac_g_ineq = NULL,
       eval_g_eq = NULL, 
       eval_jac_g_eq = NULL,
       opts = opts
       )

sol = res$solution
funk(sol)

event school

event school simulates pushing an algorithm through the event stream, so there is no look back. The equivalent to a weighting scheme is n stats of the form:

x[n][t] = k[1] * x[i] + decay * x[n][t-1] x[0][t] = r x[1][t] = r^2 signal = all(x[i]<s)

So we have the first two nodes as the last return and squared return. We then have a calculation using those two values that is similar to an exponential moving average on the mean and garch on the volatility (or both). Then we have another exactly the same, but able to also use the value of the third node. So lets say we have a theory that we should go long the market after it has crashed but the volatility has settled down: dropping time subscripts…

node[3] = 0.9 * node[3] + 0.09 * node[1] (ema of the mean return) node[4] = 0.9 * node [4] + 0.09 * node [2] (garch representation of short run volatility) node[5] = 0.99 * node [5] + 0.02 * node[2] (garch of the longer run volatility) signal = node[3] < large negative and node[4] < mid-level vol and node[5] > long-run mid-level vol

require(nloptr)
require(moments)
source("R/functions.moments.R")
delay.signal=1
ret = diff(log(trade.price[]),1)    

test.event.funk = function(x) {
  r=0
  n=as.integer(x[1])
  s=rep(0,n+2)
  for (x1 in 1:length(ret)) {
    s[1]=ret[x1]
    s[2]=ret[x1]^2
    for (x2 in 1:n) {
      s[x2+2] = x[(x2-1)*(n+2)+2] * s[x2+2] 
      for (x3 in 1:(x2+1)) {
        s[x2+2] = s[x2+2] + x[(x2-1)*(n+2)+x3+2] * s[x3]
      }
    }
    sig = s[n+2] < x[n*n+1] 
    r = r + ret[x1]*sig    
  }
  test.event.funk = -r
}

test.event.signals = function(x) {
  n=as.integer(x[1])
  s=rep(0,n+2)
  for (x1 in 1:length(ret)) {
    s[1]=ret[x1]
    s[2]=ret[x1]^2
    for (x2 in 1:n) {
      s[x2+2] = x[(x2-1)*(n+2)+2] * s[x2+2] 
      for (x3 in 1:(x2+1)) {
        s[x2+2] = s[x2+2] + x[(x2-1)*(n+2)+x3+2] * s[x3]
      }
    }
  }
  test.event.signals=s
}



minx = c(3,
  .4,0,0,0,0,
  .4,0,0,0,0,
  .4,0,0,0,0,
  0)

initx = c(3,
  .9,.1,0,0,0,
  .91,0,.11,0,0,
  .92,0,0,.12,.13,
  0.0001)

maxx = c(3,
  .999,.1,.1,0,0,
  .999,.1,.1,.1,0,
  .999,.1,.1,.1,.1,
  0.01)


t1 = test.event.funk(initx)

local_opts <- list("algorithm" = "NLOPT_LN_BOBYQA",
                   "xtol_rel"  = 1.0e-5,
                   "max_time" = 5)
opts <- list("algorithm" = "NLOPT_GN_MLSL",
             "xtol_rel"  = 1.0e-5,
             "maxeval"   = 1000000,
             "maxtime" = 60,
             "local_opts" = local_opts )


res = nloptr(initx, 
       test.event.funk, 
       eval_grad_f = NULL,
       lb = minx, 
       ub = maxx, 
       eval_g_ineq = NULL, 
       eval_jac_g_ineq = NULL,
       eval_g_eq = NULL, 
       eval_jac_g_eq = NULL,
       opts = opts
       )

sol = res$solution

complex event space

The above results are meaningless due to not taking into account a variety of extraneous variables:

  • market open and closing times
  • volume
  • trade size

To correct this, the dimension over which volatility and other statistics are calculated should be abstracted. Passage of calendar time may be one event (1 min, 30 sec etc), volume bucketing another (1 lot, 5 lots, 100 lots…), trade size is another (1 lot, 5 lots, 100 lots).

For example, say a single event is defined as volume*1+trades*1.5 > 2n. Price is recorded whenever this ‘event’ occurs and statistical calculations proceed as per the normal case.

As long as everything is linear the optimization should stay convex.

distributional approach

Let’s say we have an estimate of future returns based on historical returns as follows:

return t+1 = 0.1 * historical (conditional) mean + 0.9 unconditional mean volatility t+1 = 0.8 * historical (conditional) vol + 0.2 unconditional vol

You can have many different historical mean and vol measurements included in the above. Conditional means and vols can also be created from exogenous variables (stat arb)

We then have a distribution forecast for t+1. So:

E(actual return t+1 - forecast return)/forecast std) being normally distributed is an interesting test.

Back solving for which historical means and vols (with weighting schemes as the free variable) that cause error terms to be normally distributed.

  • calc conditionals
  • calc forecast
source("R/functions.algo.R")
source("R/functions.moments.R")
length.ma = 100    
wmean = weighted.curve(length.ma,c(0.1,0,0),c(1,0,0),c(0,0,0))
wvol= weighted.curve(length.ma,c(0.1,0,0),c(1,0,0),c(0,0,0))
mhist=weighted.historicals(ret,list(mean=wmean,vol=wvol))  
w = c(.2,.8,.9,.1)
un = c(0,.00001)  
forecast.mean = rep(0,length(ret))
forecast.mean[2:length(ret)] = w[1] * mhist$hmean[1:length(ret)-1] + w[2] * un[1]
forecast.std = rep(0,length(ret))
forecast.std[2:length(ret)] = w[3] * mhist$hvol[1:length(ret)-1] + w[4] * un[2]
forecast.error = (ret - forecast.mean)/forecast.std
qplot(forecast.mean)

forecast examples

one point forecast
source("R/functions.algo.R")
source("R/functions.moments.R")
length.ma = 100    
wmean = weighted.curve(length.ma,c(0.1,0,0),c(1,0,0),c(0,0,0))
wvol= weighted.curve(length.ma,c(0.1,0,0),c(1,0,0),c(0,0,0))
mhist=weighted.historicals(ret,list(mean=wmean,vol=wvol))  
w = c(.2,.8,.9,.1)
un = c(0,.001)  
forecast.mean = rep(0,length(ret))
forecast.mean[2:length(ret)] = w[1] * mhist$hmean[1:length(ret)-1] + w[2] * un[1]
forecast.std = rep(0,length(ret))
forecast.std[2:length(ret)] = w[3] * mhist$hvol[1:length(ret)-1] + w[4] * un[2]
forecast.error = (ret[-1] - forecast.mean[-1])/forecast.std[-1]
qplot(forecast.mean)
summary(forecast.error)  
n point forecast at t

instead of ret being used we could:

  • do a simulation
  • calc the one sd and mean bands

moment functions

startup
momentum.startup = function() {
  rm(list = ls())
  p=NULL
  require(xts)
  require(moments)
  require(nloptr)
  require(quantmod)
  require(ascii)
  require(plyr)
  require(ggplot2)
  options(warn=-1)
}
return.to.stats.raw
return.to.stats.raw = function(r) {
  r=na.omit(r)
  mean=apply(r,2,sum)/dim(r)[1]
  std=apply(r,2,sd)
  sharpe=mean/std
  skewness=apply(r,2,skewness)
  kurtosis=apply(r,2,kurtosis)
  
  return.to.stats.raw = data.frame(
    mean=mean,
    std=std,
    sharpe=sharpe,
    skewness=skewness,
    kurtosis=kurtosis)
}
return.to.stats
return.to.stats = function(xts.r) {
  r=na.omit(xts.r)
  n=dim(r)[1]
  years=as.numeric(difftime(index(r[n]),index(r[1]), units="days")/365.25)
  days.per.year=n/years
  mean=apply(r,2,sum)/years
  std=apply(r,2,sd)*sqrt(days.per.year)
  sharpe=mean/std
  skewness=apply(r,2,skewness)
  kurtosis=apply(r,2,kurtosis)
    
  return.to.stats = data.frame(
    mean=mean,
    std=std,
    sharpe=sharpe,
    skewness=skewness,
    kurtosis=kurtosis)
}
weighted.historicals
weighted.historicals = function(rets, hist.weights) {
  hmean=as.double(filter(rets,hist.weights$mean,sides=1))
  hmean[1:length(hist.weights$mean)]=cumsum(rets[1:length(hist.weights$mean)]*t(as.matrix(hist.weights$mean)))
  xvol=(rets-hmean)^2
  hvol=as.double(filter(xvol,hist.weights$vol,sides=1))^0.5
  hvol[1:length(hist.weights$vol)]=cumsum(xvol[length(hist.weights$vol)]*t(as.matrix(hist.weights$vol)))^0.5
  weighted.historicals=data.frame(hmean=hmean, hvol=hvol)
}
  • unit test
    hist.weights = list(mean=wmean,vol=wvol)
    rets2 = coredata(rets)  
    hmean=as.double(filter(rets2,hist.weights$mean,sides=1))
    hmean[1:length(hist.weights$mean)]=cumsum(rets2[1:length(hist.weights$mean)]*t(as.matrix(hist.weights$mean)))
    xvol=(rets2-hmean)^2
    hvol=as.double(filter(xvol,hist.weights$vol,sides=1)^0.5)
    hvol[1:length(hist.weights$vol)]=cumsum(xvol[length(hist.weights$vol),]*t(as.matrix(hist.weights$vol)))^0.5
    summary(data.frame(hmean=hmean, hvol=hvol))
        
quantile.ts
quantile.ts = function(ts,q) {
  n=length(ts)
  nq=length(q)
  quantile.ts=
  apply(matrix(ts,n,nq)>t(matrix(q,nq,n)),1,sum)
}
bucket.stats.posandr
bucket.stats.posandr = function(pos, r, qts1, qts2, num.quantiles, delay.signal) {
  br=matrix(0,num.quantiles,num.quantiles);bcount=br;bpos=br;c=br;babsr=br;bsqr=br;bstd=br;
  for (i in 1:num.quantiles){
    for (j in 1:num.quantiles){
      sb=(qts1==(i-1))&(qts2==(j-1))
      sb=c(as.logical(rep(0,delay.signal)), sb[-(length(sb):(length(sb)-delay.signal))])
      bcount[i,j]=sum(sb)
      if (bcount[i,j]>0){
        br[i,j]=mean(r[sb])
        bpos[i,j]=mean(pos[sb])
        babsr[i,j]=mean(abs(r[sb]))
        bsqr[i,j]=mean(r[sb]^2)
        bstd[i,j]=apply(r[sb],2,sd)
      }
    }
  }
  bucket.stats=list(
    bcount=bcount,
    br=br,
    bpos=bpos,
    babsr=babsr,
    bsqr=bsqr,
    bstd=bstd)
}
bucket.stats
bucket.stats = function(r, qts1, qts2, num.quantiles, delay.signal) {
  br=matrix(0,num.quantiles,num.quantiles);bcount=br;c=br;babsr=br;bsqr=br;bstd=br;
  for (i in 1:num.quantiles){
    for (j in 1:num.quantiles){
      sb=(qts1==(i-1))&(qts2==(j-1))
      sb=c(as.logical(rep(0,delay.signal)), sb[-(length(sb):(length(sb)-delay.signal))])
      bcount[i,j]=sum(sb)
      if (bcount[i,j]>0){
        br[i,j]=mean(r[sb])
        babsr[i,j]=mean(abs(r[sb]))
        bsqr[i,j]=mean(r[sb]^2)
        bstd[i,j]=sd(r[sb])
      }
    }
  }
  bucket.stats=list(
    bcount=bcount,
    br=br,
    babsr=babsr,
    bsqr=bsqr,
    bstd=bstd)
}
  • unit test
    qts1 = res$qts.ret
    qts2 = res$qts.vol
    r = res$rets
    br=matrix(0,num.quantiles,num.quantiles);bcount=br;c=br;babsr=br;bsqr=br;bstd=br;
    for (i in 1:num.quantiles){
      for (j in 1:num.quantiles){
        sb=(qts1==(i-1))&&(qts2==(j-1))
        sb=c(as.logical(rep(0,delay.signal)), sb[-(length(sb):(length(sb)-delay.signal))])
        bcount[i,j]=sum(sb)
        if (bcount[i,j]>0){
          br[i,j]=mean(r[sb])
          babsr[i,j]=mean(abs(r[sb]))
          bsqr[i,j]=mean(r[sb]^2)
          bstd[i,j]=apply(r[sb],2,sd)
        }
      }
    }
    data.frame(
      bcount=bcount,
      br=br,
      babsr=babsr,
      bsqr=bsqr,
      bstd=bstd)
        
print.qstat
print.qstat = function(stat,count) {
  row.count = apply(count,1,sum)
  col.count = apply(count,2,sum)
  all.count = sum(row.count)
  row.stat = apply(stat*count,1,sum)/row.count
  col.stat = apply(stat*count,2,sum)/col.count
  all.stat = sum(row.stat*row.count)/all.count
  out=rbind(cbind(stat,row.stat), c(col.stat,all.stat))
  colnames(out)=c("low", "low-mid", "mid", "high-mid", "high", "all")
  rownames(out)=c("low", "low-mid", "mid", "high-mid", "high", "all")
  print.qstat=out }
print.qstat.count
print.qstat.count = function(count) {
  row.stat = apply(count,1,sum)
  col.stat = apply(count,2,sum)
  all.stat = sum(row.stat)
  out=rbind(cbind(count,row.stat), c(col.stat,all.stat))
  colnames(out)=c("low", "low-mid", "mid", "high-mid", "high", "all")
  rownames(out)=c("low", "low-mid", "mid", "high-mid", "high", "all")
  print.qstat=out }

algorithm classifications

This is a summary of algorithm classification that will eventually become coding specifications.

The lists below are examples of categories and not meant to be exhaustive.

data-set choice

Price Endogenous

Algorithms that are endogenous to price
  • moment-based calculations
    • moving average
    • GARCH
    • volatility
    • momentum
  • technical analytics
Market Structure Endogenous

Algorithms that are endogenous to Price and market components closely related to a single security such as:

  • volume
  • bid/ask
  • market depth, order book
Statistical Arbitrage

Algorithms that look to exploit near-arbitrage bound relationships between securities.

  • VIX versus SP500 versus options
  • cross-correlation or lead-lag relationships between securities

Exogenous

Algorthims that seek to exploit relationships between price and factors external to the security market price information set.

  • fundamentals
  • twitter mentions
  • earnings announcement/ event-based analytics

algorithmic theoretica

Ways to perform forecasting

  • linear
    • regression
  • non-linear
    • NN
    • GA
  • parameter fit
    • local versus global minima
    • stationary versus non-stationary (versus semi-stationary)

risk management

position
  • leverage employed
  • P&L distribution choices (objective function)
trading
  • transactional cost importance
  • entry and exit methodologies
  • security selection

event stream engineering

disruptor

The disrupture is a quality open-source project with a committment to low-latency.

disruptor project disruptor docs lmax Martin Fowler

messaging

http://msgpack.org/

Protocol Buffers FAST ActiveQuant Google Protocol Buffer example

http://www.aurorasolutions.org/over-6-million-transactions-per-second-in-a-real-time-system-an-out-of-the-box-approach/

http://programmers.stackexchange.com/questions/121592/what-to-look-for-in-selecting-a-language-for-algorithmic-high-frequency-trading

http://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or

http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387

http://kenai.com/downloads/javafx-sam/EventProcessinginAction.pdf

esper

http://coffeeonesugar.wordpress.com/2009/07/21/getting-started-with-esper-in-5- minutes/

haskell

http://community.haskell.org/~simonmar/par-tutorial.pdf

logger/observer

trader

connect with IB

references

articles

http://www.aurorasolutions.org/over-6-million-transactions-per-second-in-a-real-time-system-an-out-of-the-box-approach/

http://programmers.stackexchange.com/questions/121592/what-to-look-for-in-selecting-a-language-for-algorithmic-high-frequency-trading

http://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or

http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387

http://kenai.com/downloads/javafx-sam/EventProcessinginAction.pdf

algo

sornette

http://arxiv.org/find/all/1/all:+sornette/0/1/0/all/0/1

some specific articles

http://arxiv.org/pdf/cond-mat/0301543.pdf http://arxiv.org/pdf/1108.0077.pdf http://arxiv.org/ftp/arxiv/papers/1012/1012.4118.pdf http://arxiv.org/pdf/1011.2882.pdf http://arxiv.org/pdf/1007.2420.pdf http://arxiv.org/pdf/0909.1007.pdf http://arxiv.org/ftp/arxiv/papers/0812/0812.2449.pdf http://arxiv.org/pdf/0909.1007.pdf

other system components and projects

ActiveQuant libcppa esper triceps cep-trader algo-trader tradelink ASM libtrading fix tradexoft

https://github.com/andykent/river I like the idea of pushing data through queries (rather than queries through data).

http://www.quickfixengine.org/quickfix/doc/html/about.html major broker API

clipboard

time

http://apple.stackexchange.com/questions/82878/how-to-tell-when-time-has-been-synced-with-ntp-server-in-mac-os-x-lion