Matt Kelsey

Technical notes, thoughts and examples

Test Driven Development of a Flask API

Recently, I have been fiddling around with Flask to create some Restful API’s. I found when developing these API’s, instead of using and abusing CURL commands to test the API, it was easiest to write Unit Tests as I went along in order to test and verify the routes I had were working. The following is a commentary on how I set it all up and got it running.

The packages I used for this setup are:

First, lets create a simple postgresql database. Since I want the test database to be separate from the main database, I am going to create one called flaskexample and one called flaskexample_test. I will show you later on, how to specify the test database when running the nose tests.

console
1
2
3
4
5
$ createuser flaskexample -P -d
  Enter password for new role:
  Enter it again:
$ createdb flaskexample -U flaskexample -h localhost
$ createdb flaskexample_test -U flaskexample -h localhost

Now, we can create our simple flask application. For the purposes of this blog, the flask application will only have a User table that we will go against. Copy the code below into your application.py file, then in the console start the python interpreter and run the following:

console
1
2
>>> from application import init_db
>>> init_db()

If this runs successfully, this should create your users table in your flaskexample database.

Now for the main Flask application. In this contrived example, we will have 2 routes to get to our user data. We will have the /users route, where we can either get a list of users or post a new user. We also have the /users/<string:id> route where we will be able to get a single user. Our tests will center around getting a list of users, adding a user, getting a specific user, trying to add a user when an email already exists and finally deleting that user.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
from flask import Flask
from flask.ext import restful
from flask.ext.restful import Resource,  reqparse, Api
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import scoped_session, sessionmaker, relationship, column_property
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.exc import IntegrityError

app = Flask("flasktestexample")
api = Api(app)
app.debug = True

if os.environ.get('DATABASE_URL') is None:
  engine = create_engine('postgres://flaskexample:flask@localhost:5432/flaskexample', convert_unicode=True)
else:
  engine = create_engine(os.environ['DATABASE_URL'], convert_unicode=True)

db_session = scoped_session(sessionmaker(autocommit=False,
                                         autoflush=False,
                                         bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

@app.teardown_request
def teardown_request(exception):
    db_session.remove()


def init_db():
    Base.metadata.drop_all(bind=engine)
    Base.metadata.create_all(bind=engine)

#User Model
class User(Base):
    __tablename__ = 'users'

    #from http://stackoverflow.com/a/11884806
    def as_dict(self):
      return {c.name: getattr(self, c.name) for c in self.__table__.columns}

    id = Column(Integer, primary_key=True)
    first_name = Column(String(200))
    last_name = Column(String(200))
    email = Column(String(200), unique=True)

#Parser arguments that Flask-Restful will check for
parser = reqparse.RequestParser()
parser.add_argument('first_name', type=str, required=True, help="First Name Cannot Be Blank")
parser.add_argument('last_name', type=str, required=True, help="Last Name Cannot Be Blank")
parser.add_argument('email', type=str, required=True, help="Email Cannot Be Blank")

#Flask Restful Views
class UserView(Resource):
  def get(self, id):
    e = User.query.filter(User.id == id).first()
    if e is not None:
      return e.as_dict()
    else:
      return {}


class UserViewList(Resource):
  def get(self):
    e = User.query.all()
    results = []
    for row in User.query.all():
      results.append(row.as_dict())
    return results

  def post(self):
      args = parser.parse_args()
      o = User()
      o.first_name = args["first_name"]
      o.last_name = args["last_name"]
      o.email = args["email"]

      try:
        db_session.add(o)
        db_session.commit()
      except IntegrityError, exc:
        return {"error": exc.message}, 500

      return o.as_dict(), 201

#Flask Restful Routes
api.add_resource(UserViewList, '/users')
api.add_resource(UserView, '/users/<string:id>')

if __name__ == '__main__':
    port = int(os.environ.get('PORT', 5000))
    app.run(host='0.0.0.0', port=port)

For the nose tests, I create a tests directory, under my application root. For this example, we will have two files. A __init__.py file, where we will setup the flask test client and the SQLAlchemy database session. I also have a testusers.py file, which is where all the tests will go. See the code blocks below.

__init__.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os
import base64
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import create_engine

from application import init_db, db_session

init_db()

import application
test_app = application.app.test_client()

def teardown():
  db_session.remove()

testusers.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import json
import nose
from nose.tools import *

from application import User

from tests import test_app

def check_content_type(headers):
  eq_(headers['Content-Type'], 'application/json')

def test_user_routes():
  rv = test_app.get('/users')
  check_content_type(rv.headers)
  resp = json.loads(rv.data)
  #make sure we get a response
  eq_(rv.status_code,200)
  #make sure there are no users
  eq_(len(resp), 0)

  #create a user
  d = dict(first_name="User1First", last_name="User1Last",email="User1@User1.com")
  rv = test_app.post('/users', data=d)
  check_content_type(rv.headers)
  eq_(rv.status_code,201)

  #Verify we sent the right data back
  resp = json.loads(rv.data)
  eq_(resp["email"],"User1@User1.com")
  eq_(resp["first_name"],"User1First")
  eq_(resp["last_name"],"User1Last")

  #Get users again...should have one
  rv = test_app.get('/users')
  check_content_type(rv.headers)
  resp = json.loads(rv.data)
  #make sure we get a response
  eq_(rv.status_code,200)
  eq_(len(resp), 1)

  #GET the user with specified ID
  rv = test_app.get('/users/%s' % resp[0]['id'])
  check_content_type(rv.headers)
  eq_(rv.status_code,200)
  resp = json.loads(rv.data)
  eq_(resp["email"],"User1@User1.com")
  eq_(resp["first_name"],"User1First")
  eq_(resp["last_name"],"User1Last")

  #Try and add Duplicate User Email
  rv = test_app.post('/users', data=d)
  check_content_type(rv.headers)
  eq_(rv.status_code,500)

For the tests, I create a simple runtests.sh shell script, which is where I specify a different database to use when running tests. It has the following line:

1
DATABASE_URL=postgres://flaskexample:flask@localhost:5432/flaskexample_test nosetests --nocapture

When running the runtests.sh file, you should get some output like:

console
1
2
3
4
5
6
$ ./runtests.sh
.
----------------------------------------------------------------------
Ran 1 test in 0.277s

OK

This means your tests have run successfully. If you go check your flaskexample_test database, you should have one user in the Users table. When you run the test again, that user will be removed via the init_db() call, and the tests will run against a blank database.

Obviously, this is a very simple example, but I think it is a good starting point for setting up Test Driven Development with Flask and SQLAlchemy.

Feel free to email me with any questions or comments. The git repo for this is located here

Authorizing and Signing a Twitter API Call Using Python

I was reading through Twitter’s excellent documenation on Authorizing and Signing an API request and decided I wanted to give implementing it a shot. The goal was to use as few external python libraries as possible. The only library I ended up using was the wonderful Requests: HTTP for Humans library to actually make the call to twitter to get my user timeline. The git repo for this is located here.

Below is an explanation of the code I wrote, in order to accomplish this. I am assuming one would know how to setup and get the needed keys for a twitter application.

The code is assuming you will have a ‘settings.cfg’ file that houses your twitter_consumer_secret, twitter_consumer_key, access_token, and access_token_secret.

It looks like:

1
2
3
4
5
[Keys]
twitter_consumer_secret: YOUR_CONSUMER_SECRET
twitter_consumer_key: YOUR_CONSUMER_KEY
access_token: YOUR_ACCESS_TOKEN
access_token_secret: YOUR_ACCESS_TOKEN_SECRET

First off, lets look at the main part of the program. The first part of the program just sets up some configuration variables.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
if __name__ == '__main__':

  #Read the Config File to get the twitter keys and tokens
  config = ConfigParser.RawConfigParser()
  config.read('settings.cfg')

  #method, url and parameters to call
  method = "get"
  url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
  url_parameters = {
      'exclude_replies': 'true'
  }

  #configuration hash for the keys
  keys = {
      "twitter_consumer_secret": config.get(
          'Keys', 'twitter_consumer_secret'),
      "twitter_consumer_key": config.get('Keys', 'twitter_consumer_key'),
      "access_token": config.get('Keys', 'access_token'),
      "access_token_secret": config.get('Keys', 'access_token_secret')
  }

  oauth_parameters = get_oauth_parameters(
      keys['twitter_consumer_key'],
      keys['access_token']
  )

  oauth_parameters['oauth_signature'] = generate_signature(
      method,
      url,
      url_parameters, oauth_parameters,
      keys['twitter_consumer_key'],
      keys['twitter_consumer_secret'],
      keys['access_token_secret']
  )

  headers = {'Authorization': create_auth_header(oauth_parameters)}

  url += '?' + urllib.urlencode(url_parameters)

  r = requests.get(url, headers=headers)

  print json.dumps(json.loads(r.text), sort_keys=False, indent=4)

Next, lets get a collect the 6 key/value pair parameters needed for the Authorization Header and for generating the signature. These 6 parameters will be used in generating the signature, and then that signature plus these 6 parameters will be used as the Authorization Header for the request.

oauth_timestamp
Indicates when the request was created
oauth_signature_method
Cryptographic hash function used to sign the request
oauth_version
Oauth version used
oauth_token
The Access Token generated by twitter when a user authorizes a twitter app to access their account
oauth_nonce
A unique token generated per request. This helps guard against replay attacks
oauth_consumer_key
Identifies which application is making the request

I accomplish this in the get_oauth_parameters function.

1
2
3
4
5
6
7
8
9
10
11
12
def get_oauth_parameters(consumer_key, access_token):
    """Returns OAuth parameters needed for making request"""
    oauth_parameters = {
        'oauth_timestamp': str(int(time.time())),
        'oauth_signature_method': "HMAC-SHA1",
        'oauth_version': "1.0",
        'oauth_token': access_token,
        'oauth_nonce': get_nonce(),
        'oauth_consumer_key': consumer_key
    }

    return oauth_parameters

The results of this function will look something like:

1
2
3
4
5
6
7
8
{
  'oauth_nonce': 'NDIzMjg0NTQ5NDI4ODgwNDg3OTg2OTMw',
  'oauth_timestamp': '1368365251',
  'oauth_consumer_key': 'YOUR_CONSUMER_KEY',
  'oauth_signature_method': 'HMAC-SHA1',
  'oauth_version': '1.0',
  'oauth_token': 'YOUR_ACCESS_TOKEN'
}

To get the nonce, I just do this:

1
2
3
4
5
def get_nonce():
    """Unique token generated for each request"""
    n = base64.b64encode(
        ''.join([str(random.randint(0, 9)) for i in range(24)]))
    return n

After collecting the needed parameters, we can go ahead and generate the oauth_signature header value. The generate_signature method will need the following:

1
2
3
4
5
6
7
8
    oauth_parameters['oauth_signature'] = generate_signature(
        method,
        url,
        url_parameters, oauth_parameters,
        keys['twitter_consumer_key'],
        keys['twitter_consumer_secret'],
        keys['access_token_secret']
    )
method
This will either be a GET or a POST
url
The URL for which the request is directed
url_parameters
Any endpoint specific url_parameters a specific request might need
oauth_paramters
The oauth_parameters we built above
oauth_consumer_key
Your consumer key
oauth_consumer_secret
Your consumer secret
oauth_token_secret
Your access token secret
status
Optional, but if posting a tweet, this needs to be part of the signature

The method looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def generate_signature(method, url, url_parameters, oauth_parameters,
                       oauth_consumer_key, oauth_consumer_secret,
                       oauth_token_secret=None, status=None):
    """Create the signature base string"""

    #Combine parameters into one hash
    temp = collect_parameters(oauth_parameters, status, url_parameters)

    #Create string of combined url and oauth parameters
    parameter_string = stringify_parameters(temp)

    #Create your Signature Base String
    signature_base_string = (
        method.upper() + '&' +
        escape(str(url)) + '&' +
        escape(parameter_string)
    )

    #Get the signing key
    signing_key = create_signing_key(oauth_consumer_secret, oauth_token_secret)

    return calculate_signature(signing_key, signature_base_string)

The first part of this method combines the oauth_parameters, status and url_parameters into a hash, sorts the paramaters alphabetically using a OrderedDict and encodes into a single string.

According to the Twitter documentation, in order to generate the proper sting you need to:

  • Percent encode every key and value that will be signed
  • Sort the list of parameters alphabetically by encoded key
  • For each key/value pair:
    • Append the encded key to the output string
    • Append the ‘=’ character to the output string
    • Append the encoded value to the output string
    • If there are more key/value pairs remaining, append a ‘&’ character to the output string

I accomplish this in the stringify_parameters method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def stringify_parameters(parameters):
    """Orders parameters, and generates string representation of parameters"""
    output = ''
    ordered_parameters = {}
    ordered_parameters = collections.OrderedDict(sorted(parameters.items()))

    counter = 1
    for k, v in ordered_parameters.iteritems():
        output += escape(str(k)) + '=' + escape(str(v))
        if counter < len(ordered_parameters):
            output += '&'
            counter += 1

    return output

After we have our stringified parameters, we can create our signature_base_string. In order to do this we need to:

  • Conert the HTTP method to uppercase
  • Append a ‘&’ character
  • Percent Encode the URL and append it to the output string
  • Append a ‘&’ character
  • Percent encode the parameter string and append it to the output string

We are now able to create our signing key and then calculate the signature to put in the Authorization Header.

To create the signing key, we simply need to combine the oauth_consumer_secret and the oauth_token_secret joined by a ‘&’ character.

1
2
3
4
5
6
7
8
def create_signing_key(oauth_consumer_secret, oauth_token_secret=None):
    """Create key to sign request with"""
    signing_key = escape(oauth_consumer_secret) + '&'

    if oauth_token_secret is not None:
        signing_key += escape(oauth_token_secret)

    return signing_key

Now that we have the signing_key, we can take that along with the signature_base_string and calculate our SHA1 signature.

1
2
3
4
5
6
7
def calculate_signature(signing_key, signature_base_string):
    """Calculate the signature using SHA1"""
    hashed = hmac.new(signing_key, signature_base_string, hashlib.sha1)

    sig = binascii.b2a_base64(hashed.digest())[:-1]

    return escape(sig)

We have our signature, now all that is left is to create our Authorization Header for the request and make our request. In order to properly create the header, we need to combine our 6 parameters from the get_oauth_parameters with the signature we just created, order them alphabetically and append them to a string beginning with “OAuth”. The key/value parameters will need to be separated by a ‘=’, they will both need to be percent encoded and the values enclosed in ‘”’.

1
2
3
4
5
6
7
8
def create_auth_header(parameters):
    """For all collected parameters, order them and create auth header"""
    ordered_parameters = {}
    ordered_parameters = collections.OrderedDict(sorted(parameters.items()))
    auth_header = (
        '%s="%s"' % (k, v) for k, v in ordered_parameters.iteritems())
    val = "OAuth " + ', '.join(auth_header)
    return val

We should now be able to make our request:

1
2
3
4
5
6
7
    headers = {'Authorization': create_auth_header(oauth_parameters)}

    url += '?' + urllib.urlencode(url_parameters)

    r = requests.get(url, headers=headers)

    print json.dumps(json.loads(r.text), sort_keys=False, indent=4)

Feel free to email me with any questions or comments. The git repo for this is located here

How I Setup VirtualEnv and VirtualEnvWrapper on My Mac

The following are the steps I take to setup up VirtualEnv and VirtualEnvWrapper on a new machine.

  • Make sure pip is installed by running pip in the terminal.

  • If it isn’t installed, install by doing

1
sudo easy_install pip
  • Next, install virtualenv and virtualenvwrapper
1
sudo pip install virtualenvwrapper
1
sudo pip install virtualenvwrapper
  • Open up your .bash_profile or .profile, and after your PATH statement, add the following
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    # set where virutal environments will live
    export WORKON_HOME=$HOME/.virtualenvs
    # ensure all new environments are isolated from the site-packages directory
    export VIRTUALENVWRAPPER_VIRTUALENV_ARGS='--no-site-packages'
    # use the same directory for virtualenvs as virtualenvwrapper
    export PIP_VIRTUALENV_BASE=$WORKON_HOME
    # makes pip detect an active virtualenv and install to it
    export PIP_RESPECT_VIRTUALENV=true
    if [[ -r /usr/local/bin/virtualenvwrapper.sh ]]; then
        source /usr/local/bin/virtualenvwrapper.sh
    else
        echo "WARNING: Can't find virtualenvwrapper.sh"
    fi

  • Open a new terminal window. You should see virtualenvwrapper.sh being run and setting up your .virtualenvs directory.

  • Test creating a new virtualenv

1
$ mkvirtualenv testenv
  • You should see something in the console like
1
2
3
4
5
6
7
8
New python executable in testenv/bin/python
Installing setuptools............done.
Installing pip...............done.
virtualenvwrapper.user_scripts creating /Users/kelsmj/.virtualenvsexport/testenv/bin/predeactivate
virtualenvwrapper.user_scripts creating /Users/kelsmj/.virtualenvsexport/testenv/bin/postdeactivate
virtualenvwrapper.user_scripts creating /Users/kelsmj/.virtualenvsexport/testenv/bin/preactivate
virtualenvwrapper.user_scripts creating /Users/kelsmj/.virtualenvsexport/testenv/bin/postactivate
virtualenvwrapper.user_scripts creating /Users/kelsmj/.virtualenvsexport/testenv/bin/get_env_details
  • That should put you into a new virtualenv called “testenv”

  • Lets install some packages and create a requirements.txt file

1
2
3
4
$ pip install rq
$ pip install requests
$ pip install google-api-python-client
$ pip freeze > requirements.txt
  • Your have just installed 3 python packages into your virtual environment, and created a .txt file that will have all the necessary information in order to reproduce your environment.

  • You can leave your virtualenv by doing

1
deactivate testenv
  • To create a new virtualenv, and install the libraries based on the requirements.txt file you just created, just do the following.
1
2
mkvirtualenv testenv2
pip install -r requirements.txt
  • List of commonly used commands
    • mkvirtualenv (create a new virtualenv)
    • rmvirtualenv (remove an existing virtualenv)
    • workon (change the current virtualenv)
    • add2virtualenv (add external packages in a .pth file to current virtualenv)
    • cdsitepackages (cd into the site-packages directory of current virtualenv)
    • cdvirtualenv (cd into the root of the current virtualenv)
    • deactivate (deactivate virtualenv, which calls several hooks)

Using Symlinks and Virtual Hosts to Serve Pages Outside of Sites Directory Using Apache on the MAC

I like keeping all my code and websites in a single “Code” directory on my machine, but it isn’t straightforward, from what I can tell, to get Apache on the MAC to serve up pages anywhere outside of the default “Sites” directory that is created when you enable “Web Sharing” on the MAC.
After doing a lot of searching, I finally found a solution that works for me.

I doubt this is the only solution, or even the “proper” solution, but it seems to work.

Below are all the steps I took in order to serve pages out of directories other then those in the “Sites” directory. For these steps, I will assume there is an existing directory of “~/Code/MyWebSite” that has a simple “Hello World” index.html file in it.

  • Enable Web Sharing on the MAC by going to System Prefrences –> Sharing –> Check Enable Web Sharing

  • Edit your username.conf file located in /private/etc/apache2/users and add the “FollowSymLinks” directive

console
1
2
3
4
5
6
<Directory "/Users/yourUserName/Sites/">
        Options Indexes MultiViews FollowSymLinks
        AllowOverride None
        Order allow,deny
        Allow from all
    </Directory>
  • Edit the /private/etc/apache2/httpd.conf file and make sure the line under “# Virtual hosts” is not commented out, like so:
console
1
Include /private/etc/apache2/extra/httpd-vhosts.conf
  • Edit the /private/etc/apache2/extra/httpd-vhosts.conf file and add:
console
1
2
3
4
5
6
7
8
<VirtualHost *:80>  
        <Directory /Users/yourUserName/Sites/MyWebSite.com>
            Options +FollowSymlinks +SymLinksIfOwnerMatch
            AllowOverride All
        </Directory>
      DocumentRoot /Users/yourUserName/Sites/MyWebSite
      ServerName MyWebSite.local
    </VirtualHost>
  • Edit the /etc/hosts file and add this at the top:
console
1
127.0.0.1 MyWebSite.local
  • Make a Symlink to link your Code directory to one in the Sites directory.
console
1
ln -s ~/Code/MyWebSite ~/Sites/MyWebSite
  • Run apachectl to check config file syntax.
console
1
sudo apachectl -t
  • Restart Apache
console
1
sudo apachectl restart
  • Open a browser and go to MyWebSite.local and you should see the results.

Download Photos From Flickr via API

In my previous post, I gave an example of how to authenticate a user against the Flickr API using their new OAuth authentication scheme.

I am going to expand on that post a bit, and show you how to download all the images for a given photo using the flickr.photos.getSizes API call.

If you followed my previous post, you should have a “token” file in your directory that contains the “oauth_token” and “oauth_token_secret” in it. You should also have an “apikeys” file that contains your application “api_key” and “secret”. We will need some code to read these files to get the keys and secrets.

Here is what I came up with:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class APIKeys(object):
    """Base class to read the APIKeys from file"""

    def __init__(self,filename='apikeys'):
        try:
            fp = open(filename)
        except IOError as e:
            if e.errno == errno.EACCESS:
                print "file does not exists"
            # Not a permission error.
            raise
        else:
            with fp:
            self.apikey = fp.readline().rstrip('\n')
            self.apisecret = fp.readline()
            fp.close()
				
class TokenKeys(object):
    """Base class to read the Access Tokens"""

    def __init__(self,filename='token'):
        try:
            fp = open(filename)
        except IOError as e:
            if e.errno == errno.EACCESS:
                print "file does not exists"
            # Not a permission error.
            raise
        else:
            with fp:
            self.token = fp.readline().rstrip('\n')
            self.secret = fp.readline()
            fp.close()

The next task at hand was coming up with a base class that can encapsulate all the work needed in order to make subsequent Flickr API calls once you have your api key and token. I named this class “FlickrApiMethod” and it sets up all the data needed to make Flickr API requests. It has a “makeCall” method that will make the REST call, load the returned JSON content and return True if the call was successful, or False if it was not. I plan on expanding on this a bit to support all the different response formats that Flickr supports, but for now it just supports JSON. This class also has an no implementation “getParameters” method that allows subclasses to fill out additional parameters that are needed for a specialized API call.

Here is the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class FlickrApiMethod(object):
    """Base class for Flickr API calls"""

    def __init__(self,nojsoncallback=True,format='json',parameters=None):
        apifile = APIKeys()
        tokenfile = TokenKeys()

        self.consumer = oauth.Consumer(key=apifile.apikey, secret=apifile.apisecret)
        self.token = oauth.Token(tokenfile.token, tokenfile.secret)

        if nojsoncallback:
            self.nojsoncallback = 1
        else:
            self.nojsoncallback = 0
        if not parameters:
            parameters = {}

        self.url = "http://api.flickr.com/services/rest"

        defaults = {
            'format':format,
            'nojsoncallback':self.nojsoncallback,
            'oauth_timestamp': str(int(time.time())),
            'oauth_nonce': oauth.generate_nonce(),
            'signature_method': "HMAC-SHA1",
            'oauth_token':self.token.key,
            'oauth_consumer_key':self.consumer.key,
        }

        defaults.update(parameters)
        self.parameters = defaults

    def makeCall(self):
        self.parameters.update(self.getParameters())
        req = oauth.Request(method="GET", url=self.url, parameters=self.parameters)
        req['oauth_signature'] = oauth.SignatureMethod_HMAC_SHA1().sign(req,self.consumer,self.token)
        h = httplib2.Http(".cache")
        resp, content = h.request(req.to_url(), "GET")
        self.content = content
        self.json = json.loads(content)

        if self.json["stat"] == "ok":
            return True
        else:
            return False

    def getParameters(self):
        raise NotImplementedError

Next, I will write a class, that given a unique “photo_id” it will call the flickr.photos.getSizes method and write out the files for all the sizes available.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class FlickrPhotosGetSizes(FlickrApiMethod):
    name ='flickr.photos.getSizes'

    def __init__(self,nojsoncallback=True,format='json',parameters=None,photo_id=None):
        FlickrApiMethod.__init__(self,nojsoncallback,format,parameters)
        self.photo_id = photo_id

    def getParameters(self):
        p ={
            'method':'flickr.photos.getSizes',
            'photo_id':self.photo_id
        }
        return p

    def writePhotos(self):
        for o in self.json["sizes"]["size"]:
            opener = urllib2.build_opener()
            page = opener.open(o["source"])
            my_picture = page.read()
            filename = o["label"].replace(' ', '_') +"_" + self.photo_id + o["source"][-4:]
            print filename
            fout = open(filename,"wb")
            fout.write(my_picture)
            fout.close()

Finally, a little bit of code to use these classes:

1
2
3
4
5
6
7
8
9
10
11
print "Please enter a photo id:",
photoId = raw_input()
print "Fetching Photo"

photoSizes = FlickrPhotosGetSizes(photo_id = photoId)
if(photoSizes.makeCall()):
	print "API Call Success! Writing Photos to Disk"
	photoSizes.writePhotos()
else:
	print "API Call Failed"
	

If all went well, you should now have files in your directory for all the different sizes of the photo you specified.

Feel free to email me with any questions or comments. The git repo for this is located here.

OAuth User Authentication for Flickr in Python

I always wanted to write an application that will use the Flickr API to pull all my photos from Flickr and store them locally, so I can further back them up via Crashplan.

I decided to write this app in python, a language which I have no professional experience in, and have only recently dabbled with. I figured it would be a good learning experience. We will see!

The first thing that needs to be accomplished, in order to make calls to the Flickr API, is authenticate a user. Flickr just recently added support for using OAuth to authenticate a user Using OAuth with Flickr and they will be removing their old authentication API in the future. The focus of this post will outline the steps I took in order to successfully use OAuth to authenticate against my application.

The code below has two prerequisites in order to work correctly:

  1. A text file called “apikeys” in the root directory containing the api_key (consumer_key) and secret for your application. You get the key and secret by creating a new Flickr Application associated with your account. Go to “The App Garden”:http://www.flickr.com/services/apps/create/apply/ to apply for one. Each entry in this file needs to be on its own line with the api_key coming first.
  2. Downloading and installing “python-oauth2”:https://github.com/simplegeo/python-oauth2

To setup python-oauth2:

1
2
~/$ git clone https://github.com/simplegeo/python-oauth2.git
~/$ ./setup.py install

The first thing to understand about using OAuth with Flickr, is every request must be signed using the HMAC-SHA1 signature encryption. You do this by building a Request object via the python-oauth2 Request method, passing in the url you want to make the request to and the parameters you want to send. You then call the SignatureMethod_HMAC_SHA1().sign method and add the resulting signature to the Request.

The begin the authentication procedure, you first need to create a request that will get a request token. The request will be made to http://www.flickr.com/services/oauth/request_token with the following paramaters: * oauth_timestamp * oauth_signature_method * oauth_version * oauth_callback * oauth_nonce * oauth_consumer_key * oauth_signature

The code to perform this operation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
url = "http://www.flickr.com/services/oauth/request_token"

# Set the base oauth_* parameters along with any other parameters required
# for the API call.
params = {
	'oauth_timestamp': str(int(time.time())),
	'oauth_signature_method':"HMAC-SHA1",
	'oauth_version': "1.0",
	'oauth_callback': "http://www.mkelsey.com",
	'oauth_nonce': oauth.generate_nonce(),
	'oauth_consumer_key': keys.apikey
}

# Setup the Consumer with the api_keys given by the provider
consumer = oauth.Consumer(key=keys.apikey, secret=keys.apisecret)

# Create our request. Change method, etc. accordingly.
req = oauth.Request(method="GET", url=url, parameters=params)

# Create the signature
signature = oauth.SignatureMethod_HMAC_SHA1().sign(req,consumer,None)

# Add the Signature to the request
req['oauth_signature'] = signature

# Make the request to get the oauth_token and the oauth_token_secret
# I had to directly use the httplib2 here, instead of the oauth library.
h = httplib2.Http(".cache")
resp, content = h.request(req.to_url(), "GET")

If all is successful, Flickr include a oauth_token and a oauth_token_secret in the Response. You can now store those tokens off and prompt a user to go authorize your application. They will need to go to http://www.flickr.com/services/oauth/authorize with the oauth_token appended as a querystring parameter along with the perms parameter indicating if your application will need read,write and delete privileges. Once the user authorizes your application, Flickr will redirect to the oauth_callback url specified above with a oauth_verifier querystring parameter that you can then use in the third and final step of oAuth authentication.

The code to perform this operation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
authorize_url = "http://www.flickr.com/services/oauth/authorize"

#parse the content
request_token = dict(urlparse.parse_qsl(content))

print "Request Token:"
print "    - oauth_token        = %s" % request_token['oauth_token']
print "    - oauth_token_secret = %s" % request_token['oauth_token_secret']
print

# Create the token object with returned oauth_token and oauth_token_secret
token = oauth.Token(request_token['oauth_token'],
	request_token['oauth_token_secret'])

# You need to authorize this app via your browser.
print "Go to the following link in your browser:"
print "%s?oauth_token=%s&perms=read" %
	(authorize_url, request_token['oauth_token'])
print

# Once you get the verified pin, input it
accepted = 'n'
while accepted.lower() == 'n':
    accepted = raw_input('Have you authorized me? (y/n) ')
oauth_verifier = raw_input('What is the PIN? ')

#set the oauth_verifier token
token.set_verifier(oauth_verifier)

The third and final step in authenticating is exchanging the Request Token for an Access Token. The Access Token is something you will store off for that user and you will be able to make Flickr API calls using the Access Token.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# url to get access token
access_token_url = "http://www.flickr.com/services/oauth/access_token"

# Now you need to exchange your Request Token for an Access Token
# Set the base oauth_* parameters along with any other parameters required
# for the API call.
access_token_parms = {
	'oauth_consumer_key': keys.apikey,
	'oauth_nonce': oauth.generate_nonce(),
	'oauth_signature_method':"HMAC-SHA1",
	'oauth_timestamp': str(int(time.time())),
	'oauth_token':request_token['oauth_token'],
	'oauth_verifier' : oauth_verifier
}

#setup request
req = oauth.Request(method="GET", url=access_token_url,
	parameters=access_token_parms)

#create the signature
signature = oauth.SignatureMethod_HMAC_SHA1().sign(req,consumer,token)

# assign the signature to the request
req['oauth_signature'] = signature

#make the request
h = httplib2.Http(".cache")
resp, content = h.request(req.to_url(), "GET")

#parse the response
access_token_resp = dict(urlparse.parse_qsl(content))

#write out a file with the oauth_token and oauth_token_secret
with open('token', 'w') as f:
	f.write(access_token_resp['oauth_token'] + '\n')
	f.write(access_token_resp['oauth_token_secret'])
f.closed

If all the requests succeeded you should have a token file in your directory with the oauth_token and the oauth_token_secret. You can use these tokens to make subsequent requests to the Flickr API. In my next post, I will illustrate how to do this.

Feel free to email me with any questions or comments. The git repo for this is located here