Statistics
| Branch: | Revision:

nfa / plugins / port_sniffer / port_sniffer_plugin.cpp @ 7faa68e1

History | View | Annotate | Download (7.99 KB)

1
#include "port_sniffer_plugin.h"
2

    
3
#include "nf.h"
4

    
5
#include <boost/lexical_cast.hpp>
6

    
7
#include <iostream>
8
#include <fstream>
9

    
10
#include <cerrno>
11
#include <cstring>
12

    
13
#include <arpa/inet.h>
14

    
15
using NFA::PortSniffer;
16
using NFA::SettingsGroup;
17
using NFA::PortsParser;
18
using NFA::ProtoPort;
19

    
20
namespace
21
{
22

    
23
class Creator
24
{
25
    public:
26
        PortSniffer* get(const NFA::SettingsGroup& s)
27
        {
28
            if (!m_plugin)
29
                m_plugin.reset(new PortSniffer(s));
30
            return m_plugin.get();
31
        }
32

    
33
    private:
34
        std::unique_ptr<PortSniffer> m_plugin;
35
} creator;
36

    
37
std::string time2ts(time_t ts)
38
{
39
    char buf[32];
40
    struct tm brokenTime;
41

    
42
    brokenTime.tm_wday = 0;
43
    brokenTime.tm_yday = 0;
44
    brokenTime.tm_isdst = 0;
45

    
46
    localtime_r(&ts, &brokenTime);
47

    
48
    strftime(buf, 32, "%Y-%m-%d %H:%M:%S", &brokenTime);
49

    
50
    return buf;
51
}
52

    
53
}
54

    
55
PortSniffer::PortSniffer(const SettingsGroup & s)
56
    : Plugin(s),
57
      _running(true),
58
      _flushThread(&PortSniffer::_flusher, this)
59
{
60
    _configureDB();
61
    _configurePort();
62
    _configureServers();
63
}
64

    
65
PortSniffer::~PortSniffer()
66
{
67
    _mutex.lock();
68
    _running = false;
69
    _mutex.unlock();
70
    _flushThread.join();
71
}
72

    
73
void PortSniffer::analyze(const NF_DATA& flow, uint32_t timeCorrection)
74
{
75
    ProtoPort pp = {flow.proto, flow.dstPort};
76
    if (_parser.check(pp)) {
77
        _mutex.lock();
78
        _flows.push_back(std::make_pair(flow, timeCorrection));
79
        _mutex.unlock();
80
    }
81
}
82

    
83
void PortSniffer::_configureDB()
84
{
85
    const SettingsGroup::const_iterator it(
86
            m_settings.find("db_conn")
87
    );
88
    if (it != m_settings.end()) {
89
        _connPtr.reset(new pqxx::connection(it->second));
90
        _checkDBStructure();
91
    }
92
}
93

    
94
void PortSniffer::_checkDBStructure()
95
{
96
    try {
97
        pqxx::work xaction(*_connPtr, "CheckDBStructure");
98

    
99
        pqxx::result res = xaction.exec("SELECT 1 FROM information_schema.tables WHERE table_name = 'tb_ports_activity'");
100

    
101
        if (res.size() == 0) {
102
            std::cerr << "PortSniffer::_checkDBStructure() - Creating DB structure" << std::endl;
103
            xaction.exec("\
104
                CREATE TABLE tb_ports_activity (\
105
                    pk_activity SERIAL PRIMARY KEY,\
106
                    user_ip INET NOT NULL,\
107
                    server_ip INET NOT NULL,\
108
                    proto INTEGER NOT NULL,\
109
                    port INTEGER NOT NULL,\
110
                    last_packet TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT 'now'\
111
                )\
112
            ");
113
            xaction.exec("\
114
                CREATE OR REPLACE FUNCTION sp_add_ports_activity (_user_ip INET, _server_ip INET, _proto INTEGER, _port INTEGER, _time TIMESTAMP WITHOUT TIME ZONE)\
115
                RETURNS INTEGER\
116
                AS $$\
117
                BEGIN\
118
                    UPDATE tb_ports_activity\
119
                    SET\
120
                        last_packet = _time\
121
                    WHERE\
122
                        user_ip = _user_ip AND\
123
                        server_ip = _server_ip AND\
124
                        port = _port AND\
125
                        proto = _proto;\
126
                    IF NOT FOUND\
127
                    THEN\
128
                        INSERT INTO tb_ports_activity\
129
                            (user_ip, server_ip, proto, port, last_packet)\
130
                        VALUES\
131
                            (_user_ip, _server_ip, _proto, _port, _time);\
132
                    END IF;\
133
                    RETURN 1;\
134
                END;\
135
                $$ LANGUAGE plpgsql;\
136
            ");
137
            xaction.commit();
138
        }
139

    
140
        _connPtr->prepare(
141
                "add_ports_activity",
142
                "SELECT sp_add_ports_activity ($1, $2, $3, $4, $5)"
143
        );
144

    
145
    }
146
    catch (std::exception & ex) {
147
        std::cerr << "PortsSniffer::_checkDBStructure() - Exception: '" << ex.what() << "'" << std::endl;
148
    }
149
}
150

    
151
void PortSniffer::_configurePort()
152
{
153
    const SettingsGroup::const_iterator it(
154
            m_settings.find("ports")
155
    );
156
    if (it != m_settings.end()) {
157
        _parser.read(it->second);
158
    }
159
}
160

    
161
void PortSniffer::_configureServers()
162
{
163
    const SettingsGroup::const_iterator it(
164
            m_settings.find("servers")
165
    );
166
    if (it == m_settings.end()) {
167
        std::cerr << "PortSniffer::_configureServers() - servers not configured" << std::endl;
168
        return;
169
    }
170
    std::ifstream inf(it->second.c_str());
171

    
172
    if (!inf.is_open()) {
173
        std::cerr << "PortSniffer::_configureServers() - cannot open servers file" << std::endl;
174
        return;
175
    }
176

    
177
    _servers.erase(_servers.begin(), _servers.end());
178

    
179
    std::string line;
180
    size_t count = 1;
181
    while (std::getline(inf, line)) {
182
        uint32_t ip;
183
        if (inet_pton(AF_INET, line.c_str(), &ip) != 1) {
184
            std::cerr << "PortSniffer::_configureServers() - error parsing file at line " << count << " ('" << line << "')" << std::endl;
185
        } else {
186
            std::vector<uint32_t>::iterator it(
187
                    std::lower_bound(
188
                        _servers.begin(),
189
                        _servers.end(),
190
                        ip)
191
            );
192
            _servers.insert(it, ip);
193
        }
194
        ++count;
195
    }
196
}
197

    
198
void PortSniffer::_flusher()
199
{
200
    time_t lastFlush = time(NULL);
201
    while (true) {
202
        _mutex.lock();
203
        if (!_running) {
204
            _mutex.unlock();
205
            break;
206
        }
207
        _mutex.unlock();
208
        time_t now = time(NULL);
209
        if (difftime(now, lastFlush) > 10) {
210

    
211
            Flows localFlows;
212

    
213
            _mutex.lock();
214
            _flows.swap(localFlows);
215
            _mutex.unlock();
216

    
217
            _flush(localFlows);
218

    
219
            lastFlush = now;
220
        }
221
        usleep(500000);
222
    }
223
}
224

    
225
void PortSniffer::_flush(const Flows & flows)
226
{
227
    try {
228
        pqxx::work xaction(*_connPtr, "Flush transaction");
229

    
230
        Flows::const_iterator it(
231
                flows.begin()
232
        );
233

    
234
        while (it != flows.end()) {
235
            if (_servers.empty()) {
236
                char srcIP[32];
237
                if (inet_ntop(AF_INET, &it->first.srcAddr, srcIP, sizeof(srcIP)) == NULL) {
238
                    std::cerr << "Source IP conversion failed: '" << strerror(errno) << "'" << std::endl;
239
                    return;
240
                }
241
                char dstIP[32];
242
                if (inet_ntop(AF_INET, &it->first.dstAddr, dstIP, sizeof(dstIP)) == NULL) {
243
                    std::cerr << "Destination IP conversion failed: '" << strerror(errno) << "'" << std::endl;
244
                    return;
245
                }
246
                xaction.prepared("add_ports_activity")(std::string(srcIP))(std::string(dstIP))(boost::lexical_cast<std::string>(static_cast<unsigned>(it->first.proto)))(boost::lexical_cast<std::string>(ntohs(it->first.dstPort)))(time2ts(ntohl(it->first.timeFinish) / 1000 + it->second)).exec();
247
            } else if (std::binary_search(
248
                        _servers.begin(),
249
                        _servers.end(),
250
                        it->first.dstAddr)) {
251
                char srcIP[32];
252
                if (inet_ntop(AF_INET, &it->first.srcAddr, srcIP, sizeof(srcIP)) == NULL) {
253
                    std::cerr << "Source IP conversion failed: '" << strerror(errno) << "'" << std::endl;
254
                    return;
255
                }
256
                char dstIP[32];
257
                if (inet_ntop(AF_INET, &it->first.dstAddr, dstIP, sizeof(dstIP)) == NULL) {
258
                    std::cerr << "Destination IP conversion failed: '" << strerror(errno) << "'" << std::endl;
259
                    return;
260
                }
261
                xaction.prepared("add_ports_activity")(std::string(srcIP))(std::string(dstIP))(boost::lexical_cast<std::string>(static_cast<unsigned>(it->first.proto)))(boost::lexical_cast<std::string>(ntohs(it->first.dstPort)))(time2ts(ntohl(it->first.timeFinish) / 1000 + it->second)).exec();
262
            }
263
            ++it;
264
        }
265

    
266
        xaction.commit();
267
    }
268
    catch (std::exception & ex) {
269
        std::cerr << "PortSniffer::_flush() - Exception: '" << ex.what() << "'" << std::endl;
270
    }
271
}
272

    
273
NFA::Plugin * getPlugin(const NFA::SettingsGroup & s)
274
{
275
    return creator.get(s);
276
}