Header & Auth Methods
Introduction
The TestSipLuaAgent Lua API library file test_sip_agent.lua
provides a method for accessing
SIP message headers and constructing header values related to authentication.
Header & Auth Methods
Header Fetch
The header_fetch
method fetches a specific header value from a SIP message that is returned by
an “expect” API method such as invite_expect_response
.
The header_fetch
takes the following arguments:
Argument | Type | Description |
---|---|---|
headers
|
Object |
Pass this parameter as request.headers or response.headers from the returned request or response message.
|
header_name
|
String | The name (case-insensitive) of the header to process and return. |
The returned value is nil
(with a failed match test being generated) if the header is not found.
If the indicated header is found then a single table structure is returned:
Argument | Type | Description |
---|---|---|
header
|
Object | A returned table containing information about the matched header. |
.name
|
String | The name as per the first matching header line found. |
.value
|
String | The complete string value of the first matching header line. |
.values
|
Array of String | An array of each of the comma-separated parts of the matching header value. |
.values_hash
|
Table | Each of the comma-separated parts of the first matching header value is examined to see if it has the form of a header parameter i.e. "(key)=(parameter)". Each matching parameter is added as an entry to this table where the key is the upper-case parameter key and the value is the parameter value. |
Example: See the below example for auth_header_value.
Auth Header Response
The auth_header_response
method implements the algorithm for building the Authorization
header
Response
part using specified parameters. It can be used to check the Response
part in a
received Authorization
header.
The method takes the following arguments:
Argument | Type | Description |
---|---|---|
uri
|
String | The URI from the request that has been challenged. |
method
|
String | The SIP method from the request that has been challenged. |
realm
|
String | The authentication realm. |
username
|
String | The username for the authenticating user. |
password
|
String | The user's password. |
nonce
|
String | The nonce from the authentication challenge. |
cnonce
|
String | The nonce used by the authenticator. |
Example: Challenge a BYE Request, and check the authentication.
-- Expect BYE at the end of a test call.
tsuo.bye_expect_request (context)
-- Ask for auth on the BYE.
local realm = 'your realm here'
local nonce = '0a4f113b' -- (Should be a unique random value, usually hex or base64.)
www_auth_header = string.format ('Digest realm="%s",nonce="%s",algorithm=MD5,qop=auth', realm, nonce)
tsuo.bye_send_response (context, 401, "Unauthorized", { { name = 'WWW-Authenticate', value = www_auth_header } })
-- These are the credentials we expect. Of course they could be looked up from a database etc.
local expected_username = 'user'
local expected_password = 'pass'
-- Expect a BYE retry with auth.
local request = tsuo.bye_expect_request (context)
tv = match.elapsed ("BYE Re-Auth (immediate)", tv, 0.0)
-- Check that the auth is valid.
local authorization_header = tsuo.header_fetch (request.headers, "Authorization")
local actual_user = authorization_header.values_hash['DIGEST USERNAME']
if (actual_user == expected_username) then
match.pass ("Expected BYE Authorization Header Username", "BYE Authorization Header Username matches.")
local expected_response = tsuo.auth_header_response (request.uri, 'BYE', realm, expected_username, expected_password, nonce, authorization_header.values_hash['CNONCE'])
local actual_response = authorization_header.values_hash['RESPONSE']
if (actual_response == expected_response) then
match.pass ("Expected BYE Authorization Header Response", "BYE Authorization Header Response matches.")
tsuo.bye_send_response (context, 200, "OK")
else
match.fail ("Expected BYE Authorization Header Response", "BYE Authorization Header Response does not match.")
tsuo.bye_send_response (context, 403, "Forbidden")
end
else
match.fail ("Expected BYE Authorization Header Username", "BYE Authorization Header Username does not match.")
tsuo.bye_send_response (context, 403, "Forbidden")
end
Auth Header Value
The auth_header_value
method implements the algorithm for building an Authorization
header
based on a received WWWW-Authenticate
challenge header.
The method takes the following arguments:
Argument | Type | Description |
---|---|---|
context
|
Object |
An INVITE or non-INVITE context created by invite_context or non_invite_context
returned by invite_expect_request .
|
method
|
String | The SIP Request method. |
www_auth_header
|
Object |
The WWW-Authenticate header value returned from header_fetch .
|
username
|
String | The username for the authenticating user. |
password
|
String | The user's password. |
Example: INVITE Request is challenged, and authenticated.
local n2svcd = require "n2.n2svcd"
local utils = require "n2.utils"
local match = require "n2.n2svcd.tester.match"
local manage = require "n2.n2svcd.tester.manage"
local edr_file_agent = require "n2.n2svcd.edr_file_agent"
local tsuo = require "n2.n2svcd.tester.test_sip_agent"
local args = ...
-- Switch the authentication mode to DIGEST. BYE DOES require authentication.
n2svcd.management_configuration_scalar ('LHO', 'sip_auth_schema', 'digest')
n2svcd.management_configuration_scalar ('LHO', 'sip_open_bye', '0')
n2svcd.management_configuration_scalar ('LHO', 'sip_open_ack', '1')
-- Checkpoint stats for "Logic" and "LHO" apps.
local stats_Logic = match.stats ('Logic')
local stats_LHO = match.stats ('LHO')
-- Static for our call.
local endpoints = tsuo.default_endpoints ({ local_rtp_port = 3668 })
local calling_party = '665566'
local called_party = '4000'
local username = '665566'
local password = 'ABC'
-- Get a SIP outcall context from our helper library.
local context = tsuo.invite_context (endpoints, calling_party, called_party, {
contact_sip_instance = '"<urn:uuid:27b843f9-d300-49fb-b108-fff03a6369e3>"',
contact_expires = '3600'
})
-- Construct the SDP
local sdp_offer = tsuo.sdp_offer (context, tsuo.SDP_LINPHONE)
-- Start timer for elapsed checking.
local tv = match.elapsed ()
-- Construct and Send INVITE Request.
tsuo.invite_send_request (context, sdp_offer, {
{ name = 'Record-Route', value = '<sip:p2.domain.com;lr>'},
{ name = 'Record-Route', value = '<sip:p1.example.com;lr>'},
})
-- Expect INVITE response (401 Unauthorized) and ACK.
local response = tsuo.invite_expect_response (context, 401, "Unauthorized")
tsuo.invite_send_decline_ack (context)
-- Generate the authorization header for our request.
local www_auth_header = tsuo.header_fetch (response.headers, "WWW-Authenticate")
local authorization_header = tsuo.auth_header_value (context, 'INVITE', www_auth_header, username, password)
-- Re-Send INVITE Request with Authorization.
tsuo.invite_send_request (context, sdp_offer, {
{ name = "Authorization", value = authorization_header },
{ name = 'Record-Route', value = '<sip:p2.domain.com;lr>'},
{ name = 'Record-Route', value = '<sip:p1.example.com;lr>'},
})
-- Expect Trying & Ringing.
tsuo.invite_expect_response (context, 100, "Trying", { ['User-Agent'] = "N-Squared LHO" })
tsuo.invite_expect_response (context, 180, "Ringing", {
['Record-Route'] = { '<sip:p2.domain.com;lr>', '<sip:p1.example.com;lr>' }
})
tv = match.elapsed ("Ring Notification (immediate)", tv, 0.0)