Statistics
| Branch: | Revision:

nfa / plugins / port_sniffer / port_sniffer_plugin.cpp @ 78d38675

History | View | Annotate | Download (8.16 KB)

1
#include <arpa/inet.h>
2

    
3
#include <cerrno>
4
#include <cstring>
5

    
6
#include <iostream>
7
#include <fstream>
8

    
9
#include <boost/lexical_cast.hpp>
10

    
11
#include "port_sniffer_plugin.h"
12

    
13
using NF_ANALYZER::PortSniffer;
14
using NF_ANALYZER::SettingsGroup;
15
using NF_ANALYZER::PortsParser;
16
using NF_ANALYZER::ProtoPort;
17

    
18
std::string time2ts(time_t ts);
19

    
20
PortSniffer::PortSniffer(const SettingsGroup & s)
21
    : Plugin(s),
22
      _parser(),
23
      _servers(),
24
      _flows(),
25
      _connPtr(NULL),
26
      _running(true),
27
      _mutex(),
28
      _flushThread(&PortSniffer::_flusher, this)
29
{
30
    _configureDB();
31
    _configurePort();
32
    _configureServers();
33
}
34

    
35
PortSniffer::~PortSniffer()
36
{
37
    _mutex.lock();
38
    _running = false;
39
    _mutex.unlock();
40
    _flushThread.join();
41
}
42

    
43
void PortSniffer::analyze(const NF_DATA * flow, uint32_t timeCorrection)
44
{
45
    ProtoPort pp = {flow->proto, flow->dstPort};
46
    if (_parser.check(pp)) {
47
        _mutex.lock();
48
        _flows.push_back(std::make_pair(*flow, timeCorrection));
49
        _mutex.unlock();
50
    }
51
}
52

    
53
void PortSniffer::_configureDB()
54
{
55
    const SettingsGroup::const_iterator it(
56
            _settings.find("db_conn")
57
    );
58
    if (it != _settings.end()) {
59
        _connPtr.reset(new pqxx::connection(it->second));
60
        _checkDBStructure();
61
    }
62
}
63

    
64
void PortSniffer::_checkDBStructure()
65
{
66
    try {
67
        pqxx::work xaction(*_connPtr, "CheckDBStructure");
68
        
69
        pqxx::result res = xaction.exec("SELECT 1 FROM information_schema.tables WHERE table_name = 'tb_ports_activity'");
70

    
71
        if (res.size() == 0) {
72
            std::cerr << "PortSniffer::_checkDBStructure() - Creating DB structure" << std::endl;
73
            xaction.exec("\
74
                CREATE TABLE tb_ports_activity (\
75
                    pk_activity SERIAL PRIMARY KEY,\
76
                    user_ip INET NOT NULL,\
77
                    server_ip INET NOT NULL,\
78
                    proto INTEGER NOT NULL,\
79
                    port INTEGER NOT NULL,\
80
                    last_packet TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT 'now'\
81
                )\
82
            ");
83
            xaction.exec("\
84
                CREATE OR REPLACE FUNCTION sp_add_ports_activity (_user_ip INET, _server_ip INET, _proto INTEGER, _port INTEGER, _time TIMESTAMP WITHOUT TIME ZONE)\
85
                RETURNS INTEGER\
86
                AS $$\
87
                BEGIN\
88
                    UPDATE tb_ports_activity\
89
                    SET\
90
                        last_packet = _time\
91
                    WHERE\
92
                        user_ip = _user_ip AND\
93
                        server_ip = _server_ip AND\
94
                        port = _port AND\
95
                        proto = _proto;\
96
                    IF NOT FOUND\
97
                    THEN\
98
                        INSERT INTO tb_ports_activity\
99
                            (user_ip, server_ip, proto, port, last_packet)\
100
                        VALUES\
101
                            (_user_ip, _server_ip, _proto, _port, _time);\
102
                    END IF;\
103
                    RETURN 1;\
104
                END;\
105
                $$ LANGUAGE plpgsql;\
106
            ");
107
            xaction.commit();
108
        }
109

    
110
        _connPtr->prepare(
111
                "add_ports_activity",
112
                "SELECT sp_add_ports_activity ($1, $2, $3, $4, $5)"
113
        );
114

    
115
    }
116
    catch (std::exception & ex) {
117
        std::cerr << "PortsSniffer::_checkDBStructure() - Exception: '" << ex.what() << "'" << std::endl;
118
    }
119
}
120

    
121
void PortSniffer::_configurePort()
122
{
123
    const SettingsGroup::const_iterator it(
124
            _settings.find("ports")
125
    );
126
    if (it != _settings.end()) {
127
        _parser.read(it->second);
128
    }
129
}
130

    
131
void PortSniffer::_configureServers()
132
{
133
    const SettingsGroup::const_iterator it(
134
            _settings.find("servers")
135
    );
136
    if (it == _settings.end()) {
137
        std::cerr << "PortSniffer::_configureServers() - servers not configured" << std::endl;
138
        return;
139
    }
140
    std::ifstream inf(it->second.c_str());
141

    
142
    if (!inf.is_open()) {
143
        std::cerr << "PortSniffer::_configureServers() - cannot open servers file" << std::endl;
144
        return;
145
    }
146

    
147
    _servers.erase(_servers.begin(), _servers.end());
148

    
149
    std::string line;
150
    size_t count = 1;
151
    while (std::getline(inf, line)) {
152
        uint32_t ip;
153
        if (inet_pton(AF_INET, line.c_str(), &ip) != 1) {
154
            std::cerr << "PortSniffer::_configureServers() - error parsing file at line " << count << " ('" << line << "')" << std::endl;
155
        } else {
156
            std::vector<uint32_t>::iterator it(
157
                    std::lower_bound(
158
                        _servers.begin(),
159
                        _servers.end(),
160
                        ip)
161
            );
162
            _servers.insert(it, ip);
163
        }
164
        ++count;
165
    }
166
}
167

    
168
void PortSniffer::_flusher()
169
{
170
    time_t lastFlush = time(NULL);
171
    while (true) {
172
        _mutex.lock();
173
        if (!_running) {
174
            _mutex.unlock();
175
            break;
176
        }
177
        _mutex.unlock();
178
        time_t now = time(NULL);
179
        if (difftime(now, lastFlush) > 10) {
180

    
181
            Flows localFlows;
182

    
183
            _mutex.lock();
184
            _flows.swap(localFlows);
185
            _mutex.unlock();
186

    
187
            _flush(localFlows);
188

    
189
            lastFlush = now;
190
        }
191
        usleep(500000);
192
    }
193
}
194

    
195
void PortSniffer::_flush(const Flows & flows)
196
{
197
    char srcIP[32];
198
    char dstIP[32];
199
    try {
200
        pqxx::work xaction(*_connPtr, "Flush transaction");
201

    
202
        Flows::const_iterator it(
203
                flows.begin()
204
        );
205

    
206
        while (it != flows.end()) {
207
            if (_servers.empty()) {
208
                if (inet_ntop(AF_INET, &it->first.srcAddr, srcIP, sizeof(srcIP)) == NULL) {
209
                    std::cerr << "Source IP conversion failed: '" << strerror(errno) << "'" << std::endl;
210
                    return;
211
                }
212
                if (inet_ntop(AF_INET, &it->first.dstAddr, dstIP, sizeof(dstIP)) == NULL) {
213
                    std::cerr << "Destination IP conversion failed: '" << strerror(errno) << "'" << std::endl;
214
                    return;
215
                }
216
                xaction.prepared("add_ports_activity")(std::string(srcIP))(std::string(dstIP))(boost::lexical_cast<std::string>(static_cast<unsigned>(it->first.proto)))(boost::lexical_cast<std::string>(ntohs(it->first.dstPort)))(time2ts(ntohl(it->first.timeFinish) / 1000 + it->second)).exec();
217
            } else if (std::binary_search(
218
                        _servers.begin(),
219
                        _servers.end(),
220
                        it->first.dstAddr)) {
221
                if (inet_ntop(AF_INET, &it->first.srcAddr, srcIP, sizeof(srcIP)) == NULL) {
222
                    std::cerr << "Source IP conversion failed: '" << strerror(errno) << "'" << std::endl;
223
                    return;
224
                }
225
                if (inet_ntop(AF_INET, &it->first.dstAddr, dstIP, sizeof(dstIP)) == NULL) {
226
                    std::cerr << "Destination IP conversion failed: '" << strerror(errno) << "'" << std::endl;
227
                    return;
228
                }
229
                xaction.prepared("add_ports_activity")(std::string(srcIP))(std::string(dstIP))(boost::lexical_cast<std::string>(static_cast<unsigned>(it->first.proto)))(boost::lexical_cast<std::string>(ntohs(it->first.dstPort)))(time2ts(ntohl(it->first.timeFinish) / 1000 + it->second)).exec();
230
            }
231
            ++it;
232
        }
233

    
234
        xaction.commit();
235
    }
236
    catch (std::exception & ex) {
237
        std::cerr << "PortSniffer::_flush() - Exception: '" << ex.what() << "'" << std::endl;
238
    }
239
}
240

    
241
class PortSnifferPluginCreator
242
{
243
    public:
244
        PortSnifferPluginCreator()
245
            : plugin(NULL)
246
        {};
247
        ~PortSnifferPluginCreator()
248
        { delete plugin; };
249

    
250
        PortSniffer * get(const NF_ANALYZER::SettingsGroup & s) { return (plugin == NULL ? plugin = new PortSniffer(s) : plugin); };
251
    private:
252
        PortSniffer * plugin;
253
} portSnifferPluginCreator;
254

    
255
NF_ANALYZER::Plugin * getPlugin(const NF_ANALYZER::SettingsGroup & s)
256
{
257
    return portSnifferPluginCreator.get(s);
258
}
259

    
260
std::string time2ts(time_t ts)
261
{
262
    char buf[32];
263
    struct tm brokenTime;
264

    
265
    brokenTime.tm_wday = 0;
266
    brokenTime.tm_yday = 0;
267
    brokenTime.tm_isdst = 0;
268

    
269
    localtime_r(&ts, &brokenTime);
270

    
271
    strftime(buf, 32, "%Y-%m-%d %H:%M:%S", &brokenTime);
272

    
273
    return buf;
274
}