class msiempy.NitroSession: (source)
Single session handler and HTTP interface. The session object will handle authentication and intermittent (but annoying) SIEM errors.
It provides easier dialogue with the ESM by doing argument interpolation.
See api_request
and request
for usage.
Instance Variable | session | Underlying requests.Session object. |
Instance Variable | config | NitroConfig object. |
Instance Variable | login_info | Login user infos as returned by login API method. |
Method | __init__ | Create or get the ESM session |
Instance Variable | __dict__ | Undocumented |
Instance Variable | api_v | Undocumented |
Instance Variable | logged_in | Undocumented |
Class Variable | BASE_URL | API base url: https://{}/rs/esm/ |
Class Variable | BASE_URL_PRIV | Private API base URL: https://{}/ess/ |
Class Variable | __initiated__ | Weither the session has been intaciated. It's supposed to be a singleton. |
Class Variable | __unique_state__ | The singleton unique state. |
Class Variable | PARAMS | Private SIEM API methos/parameters mapping Dict[str, tuple] . This structure provide a central place to aggregate API methods and parameters. |
Method | __str__ | Undocumented |
Method | login | Authentication is done lazily upon the first call to msiempy.core.session.NitroSession.request method, but you can still do it manually by calling this method. |
Instance Variable | user_tz_id | Undocumented |
Instance Variable | esm_v | Undocumented |
Method | logout | This method will logout the session. |
Method | api_request | Handle a lower level HTTP request to ESM API endpoints. Make direct API calls with any data. This is useful when dealing with features of the ESM API that are not explicitly implemented in this library yet (i.e. user managment or latest API calls). |
Method | version | Returns: str ESM short version. Example: '10.0.2' |
Method | buildstamp | Returns: str ESM buildstamp. Example: '10.0.2 20170516001031' |
Method | get_internal_file | Uses the private API to retrieve, assemble and delete a temp file from the ESM. |
Method | request | Interface to make ESM API calls more simple by interpolating **kwargs arguments with NitroSession.PARAMS docstrings and build a valid datastructure for the HTTP data. |
Static Method | _init_log | Private method. Inits the session's logger settings based on params All objects should be able to log stuff, so the logger is globaly accessible |
Static Method | _format_params | Format private API call. |
Static Method | _format_priv_resp | Format response from private API. |
Static Method | _unpack_resp | Unpack data from response. Should not be necessary with API v2. |
Create or get the ESM session
config
(msiempy.core.config.NitroConfig
): Config object. Find default config if None
.bool
)
Private SIEM API methos/parameters mapping Dict[str, tuple]
.
This structure provide a central place to aggregate API methods and parameters.
The first tuple item is the SIEM API endpoint name. The second item is the JSON string data parameters required for the enpoint call.
If the tuple item is a string.Template
string, it needs to be interpolated with paramaters.
NitroSession.request
for a list of all possible calls and usage.Authentication is done lazily upon the first call to msiempy.core.session.NitroSession.request
method, but you can still do it manually by calling this method.
msiempy.core.session.NitroError
if login failsHandle a lower level HTTP request to ESM API endpoints. Make direct API calls with any data. This is useful when dealing with features of the ESM API that are not explicitly implemented in this library yet (i.e. user managment or latest API calls).
Format the request, handle the basic parsing of the SIEM result as well as other errors.
All upper cases method names signals to use the private API methods.
method
(str
): ESM API enpoint name and url formatted parametershttp
(str
): HTTP method.data
(dict
): POST data to sendcallback
(callable
): function to apply afterwardsraw
(bool
): If true will return the Response object from requests module. No retry when raw=True.secure
(bool
): If true will not log the content of the request.retry
(int
): Number of time the request can be retriedmsiempy.NitroError
if any HTTPError
Exemple:
from msiempy import NitroSession s = NitroSession() s.login() # qryGetFilterFields s.api_request('qryGetFilterFields') # Get all last 24h alarms details with ESM API v2. alarms = s.api_request('v2/alarmGetTriggeredAlarms?triggeredTimeRange=LAST_24_HOURS&status=&pageSize=500&pageNumber=1', None) for a in alarms: a.update(s.api_request('v2/notifyGetTriggeredNotificationDetail', {'id':a['id']}))
Uses the private API to retrieve, assemble and delete a temp file from the ESM.
Arguments:
file_token
(str
): File token IDInterface to make ESM API calls more simple by interpolating **kwargs
arguments with NitroSession.PARAMS
docstrings and build a valid datastructure for the HTTP data.
Then call the NitroSession.api_request
method with the built data.
Also handles auto-login.
request
(str
): Name keyword corresponding to the request name in NitroSession.PARAMS
mapping.http
(str
): HTTP method.callback
(callable
): function to apply afterwardsraw
(bool
): If true will return the Response object from requests module.secure
(bool
): If true will not log the content of the request.retry
(int
): Number of time the request can be retried**kwargs
: Interpolation parameters that will be match to NitroSession.PARAMS
templates. Dynamic keyword arguments.Exemple:
from msiempy import NitroSession s = NitroSession() s.login() # Get all last 24h alarms details alarms = s.request('get_alarms', time_range='LAST_24_HOURS', status='', page_size=500, page_number=0) for a in alarms: a.update(s.request('get_notification_detail', id=a['id']))
All upper cases method names signals to use the private API methods.
>>> s.request("login", username, password) # Call login >>> s.request("get_devtree", ) # Call GRP_GETVIRTUALGROUPIPSLISTDATA >>> s.request("get_zones_devtree", ) # Call GRP_GETVIRTUALGROUPIPSLISTDATA >>> s.request("req_client_str", ds_id) # Call DS_GETDSCLIENTLIST >>> s.request("get_rfile", ftoken) # Call MISC_READFILE >>> s.request("del_rfile", ftoken) # Call ESSMGT_DELETEFILE >>> s.request("get_rfile2", ftoken, pos, nbytes) # Call MISC_READFILE >>> s.request("get_wfile", ds_id) # Call MISC_WRITEFILE >>> s.request("get_rule_history", ) # Call PLCY_GETRULECHANGEINFO >>> s.request("add_ds_11_1_3", parent_id, name, ds_ip, type_id, zone_id, enabled, url, ds_id, child_enabled, child_count, child_type, idm_id, parameters) # Call dsAddDataSource >>> s.request("add_ds_11_2_1", parent_id, name, ds_ip, type_id, zone_id, enabled, url, parameters) # Call dsAddDataSources >>> s.request("add_client1", parent_id, name, enabled, ds_ip, hostname, type_id, tz_id, dorder, maskflag, port, require_tls) # Call DS_ADDDSCLIENT >>> s.request("get_recs", ) # Call devGetDeviceList >>> s.request("get_dstypes", rec_id) # Call dsGetDataSourceTypes >>> s.request("del_ds1", parent_id, ds_id) # Call dsDeleteDataSource >>> s.request("del_ds2", parent_id, ds_id) # Call dsDeleteDataSources >>> s.request("del_client", parent_id, ftoken) # Call DS_DELETEDSCLIENTS >>> s.request("get_job_status", job_id) # Call MISC_JOBSTATUS >>> s.request("ds_last_times", ) # Call QRY_GETDEVICELASTALERTTIME >>> s.request("zonetree", ) # Call zoneGetZoneTree >>> s.request("ds_by_type", ) # Call QRY_GETDEVICECOUNTBYTYPE >>> s.request("ds_details1", ds_id) # Call dsGetDataSourceDetail >>> s.request("ds_details2", ds_id) # Call dsGetDataSourceDetail >>> s.request("get_alarms_custom_time", time_range, start_time, end_time, status, page_size, page_number) # Call alarmGetTriggeredAlarms >>> s.request("get_alarms", time_range, status, page_size, page_number) # Call alarmGetTriggeredAlarms >>> s.request("get_notification_detail", id) # Call notifyGetTriggeredNotificationDetail >>> s.request("get_alarm_details", id) # Call notifyGetTriggeredNotification >>> s.request("get_alarm_details_int", id) # Call NOTIFY_GETTRIGGEREDNOTIFICATIONDETAIL >>> s.request("ack_alarms", ids) # Call alarmAcknowledgeTriggeredAlarm >>> s.request("ack_alarms_11_2_1", ids) # Call alarmAcknowledgeTriggeredAlarm >>> s.request("unack_alarms", ids) # Call alarmUnacknowledgeTriggeredAlarm >>> s.request("unack_alarms_11_2_1", ids) # Call alarmUnacknowledgeTriggeredAlarm >>> s.request("delete_alarms", ids) # Call alarmDeleteTriggeredAlarm >>> s.request("delete_alarms_11_2_1", ids) # Call alarmDeleteTriggeredAlarm >>> s.request("get_alerts_now", ds_id) # Call IPS_GETALERTSNOW >>> s.request("get_flows_now", ds_id) # Call IPS_GETFLOWSNOW >>> s.request("get_possible_filters", ) # Call v2/qryGetFilterFields >>> s.request("get_possible_fields", type, groupType) # Call v2/qryGetSelectFields >>> s.request("event_query_custom_time", time_range, start_time, end_time, fields, filters, limit, offset, order_field, order_direction) # Call v2/qryExecuteDetail >>> s.request("event_query", time_range, fields, filters, limit, offset, order_field, order_direction) # Call v2/qryExecuteDetail >>> s.request("query_status", resultID) # Call v2/qryGetStatus >>> s.request("query_result", startPos, numRows, resultID) # Call v2/qryGetResults >>> s.request("grouped_event_query", filters, field, time_range) # Call v2/qryExecuteGrouped >>> s.request("grouped_event_query_custom_time", filters, field, time_range, start_time, end_time) # Call v2/qryExecuteGrouped >>> s.request("close_query", resultID) # Call v2/qryClose >>> s.request("get_alert_data", id) # Call ipsGetAlertData >>> s.request("add_note_to_event", id, note) # Call ipsAddAlertNote >>> s.request("add_note_to_event_int", id, note) # Call IPS_ADDALERTNOTE >>> s.request("get_wl_types", ) # Call sysGetWatchlistFields >>> s.request("get_watchlists_no_filters", hidden, dynamic, writeOnly, indexedOnly) # Call sysGetWatchlists >>> s.request("get_watchlist_details", id) # Call sysGetWatchlistDetails >>> s.request("add_watchlist", name, wl_type) # Call sysAddWatchlist >>> s.request("add_watchlist_values", watchlist, values) # Call sysAddWatchlistValues >>> s.request("remove_watchlist_values", watchlist, values) # Call sysRemoveWatchlistValues >>> s.request("get_watchlist_values", id) # Call SYS_GETWATCHLISTDETAILS >>> s.request("remove_watchlists", wl_id_list) # Call sysRemoveWatchlist >>> s.request("get_user_locale", ) # Call getUserLocale >>> s.request("time_zones", ) # Call userGetTimeZones >>> s.request("logout", ) # Call logout >>> s.request("get_sys_info", ) # Call SYS_GETSYSINFO >>> s.request("get_esm_time", ) # Call essmgtGetESSTime >>> s.request("build_stamp", ) # Call essmgtGetBuildStamp
Unpack data from response. Should not be necessary with API v2.
requests.Response
response object