From: Hamatoma Date: Tue, 29 Jun 2021 19:54:59 +0000 (+0200) Subject: daily work X-Git-Url: https://gitweb.hamatoma.de/?a=commitdiff_plain;h=8aeeb557915cadbae76765d508173f56f010aca4;p=exhibition.git daily work --- diff --git a/rest_server/.gitignore b/rest_server/.gitignore new file mode 100644 index 0000000..3c8a157 --- /dev/null +++ b/rest_server/.gitignore @@ -0,0 +1,6 @@ +# Files and directories created by pub. +.dart_tool/ +.packages + +# Conventional directory for build output. +build/ diff --git a/rest_server/CHANGELOG.md b/rest_server/CHANGELOG.md new file mode 100644 index 0000000..effe43c --- /dev/null +++ b/rest_server/CHANGELOG.md @@ -0,0 +1,3 @@ +## 1.0.0 + +- Initial version. diff --git a/rest_server/README.md b/rest_server/README.md new file mode 100644 index 0000000..3816eca --- /dev/null +++ b/rest_server/README.md @@ -0,0 +1,2 @@ +A sample command-line application with an entrypoint in `bin/`, library code +in `lib/`, and example unit test in `test/`. diff --git a/rest_server/analysis_options.yaml b/rest_server/analysis_options.yaml new file mode 100644 index 0000000..dee8927 --- /dev/null +++ b/rest_server/analysis_options.yaml @@ -0,0 +1,30 @@ +# This file configures the static analysis results for your project (errors, +# warnings, and lints). +# +# This enables the 'recommended' set of lints from `package:lints`. +# This set helps identify many issues that may lead to problems when running +# or consuming Dart code, and enforces writing Dart using a single, idiomatic +# style and format. +# +# If you want a smaller set of lints you can change this to specify +# 'package:lints/core.yaml'. These are just the most critical lints +# (the recommended set includes the core lints). +# The core lints are also what is used by pub.dev for scoring packages. + +include: package:lints/recommended.yaml + +# Uncomment the following section to specify additional rules. + +# linter: +# rules: +# - camel_case_types + +# analyzer: +# exclude: +# - path/to/excluded/files/** + +# For more information about the core and recommended set of lints, see +# https://dart.dev/go/core-lints + +# For additional information about configuring this file, see +# https://dart.dev/guides/language/analysis-options diff --git a/rest_server/bin/rest_server.dart b/rest_server/bin/rest_server.dart new file mode 100644 index 0000000..d8dd6e9 --- /dev/null +++ b/rest_server/bin/rest_server.dart @@ -0,0 +1,6 @@ +import 'package:rest_server/services.dart'; + +/// Parses the program arguments and executes the services. +void main(List args) async { + await run(args); +} diff --git a/rest_server/data/sql/users.sql.yaml b/rest_server/data/sql/users.sql.yaml new file mode 100644 index 0000000..12bda83 --- /dev/null +++ b/rest_server/data/sql/users.sql.yaml @@ -0,0 +1,24 @@ +--- +# SQL statements of the module "Users": +module: Users +list: + type: list + params: [] + sql: select * from loginusers; +byId: + type: query + params: [ ":id" ] + sql: "select * from loginusers where user_id=?;" +update: + type: update + params: [":name", ":displayname", ":email", ":changedby"] + sql: "UPDATE loginusers SET user_name=?, user_displayname=?, user_email=?, user_changed=NOW(), user_changedby=? + WHERE user_id=?;" +insert: + type: insert + params: [":name", ":displayname", ":email", ":createdby"] + sql: "INSERT INTO loginusers(user_name, user_displayname, user_email, user_changedby) + VALUES(?, ?, ?, NOW(), ?);" + + + diff --git a/rest_server/lib/rest_server.dart b/rest_server/lib/rest_server.dart new file mode 100644 index 0000000..03f9b16 --- /dev/null +++ b/rest_server/lib/rest_server.dart @@ -0,0 +1,988 @@ +import 'dart:async'; +import 'dart:convert' as convert; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:dart_bones/dart_bones.dart'; +import 'package:http/http.dart' as http; +import 'package:crypto/crypto.dart'; +import 'package:path/path.dart' as path; +import 'package:args/args.dart'; + +const forbidden = 'forbidden'; +const wrongData = 'wrong data'; +const wrongParameters = 'wrong parameters'; + +/// Returns the MD5 hash of [text]. +String buildMd5Hash(String text) { + final rc = md5.convert(convert.utf8.encode(text)).toString(); + return rc; +} + +/// Starts an isolate instance with the channel [isolateToMainStream]. +void runIsolate(SendPort isolateToMainStream) { + var mainToIsolateStream = ReceivePort(); + isolateToMainStream.send(mainToIsolateStream.sendPort); + + mainToIsolateStream.listen((data) { + if (data is String && data.startsWith('WorkerParameter')) { + final params = WorkerParameters.fromString(data); + final worker = + ServiceWorker(params.id, params.configuration, params.serviceName); + if (params.id == 1) { + worker.observe(); + } else { + worker.listen(); + } + } + }); + + isolateToMainStream.send('This is from myIsolate()'); +} + +/// Implements a modified REST server. +/// That is a HTTP server serving only the method POST (for queries and storages). +/// The server is multi threaded by isolates. +class RestServer { + static const _version = 'V2021.06.29.00'; + static BaseLogger? unittestLogger; + BaseLogger logger = MemoryLogger(LEVEL_FINE); + BaseConfiguration configuration = BaseConfiguration({}, globalLogger); + var clientSessionTimeout = 15 * 60; + String serviceName = 'exhibition'; + RestServer( + int port, + this.logger, { + String address = 'localhost', + int sessionTimeout = 300, + int answerDumpLength = 80, + int dataDumpLength = 200, + String db = 'appexhibition', + String dbUser = 'exhibition', + String dbCode = 'TopSecret', + String dbHost = 'localhost', + int dbPort = 3306, + int watchDogPause = 60, + int traceDataLength = 80, + clientSessionTimeout = 15 * 60, + String dataDirectory = 'data', + }) { + if (unittestLogger != null) { + logger = unittestLogger!; + } + configuration = BaseConfiguration({ + 'service': { + 'port': port, + 'watchDogPause': watchDogPause, + 'address': address, + 'dataDirectory': dataDirectory, + 'threads': Platform.numberOfProcessors, + }, + 'trace': { + 'answerLength': answerDumpLength, + 'sqlLength': dataDumpLength, + }, + 'db': { + 'db': db, + 'user': dbUser, + 'code': dbCode, + 'host': dbHost, + 'port': dbPort, + 'traceDataLength': traceDataLength, + }, + 'clientSessionTimeout': clientSessionTimeout, + }, logger); + } + + /// Konstruktor, der die Parameter aus der YAML-Konfigurationsdatei + /// [filename] bezieht. + /// + RestServer.fromConfig(String filename, {this.serviceName = 'exhibition'}) { + final logger2 = unittestLogger ?? MemoryLogger(); + configuration = Configuration.fromFile(filename, logger2); + final logFile = configuration.asString('logFile', + section: 'service', + defaultValue: '/var/log/local/$serviceName.log') ?? + ''; + final level = + configuration.asInt('logLevel', section: 'service') ?? LEVEL_SUMMERY; + logger = unittestLogger ?? Logger(logFile, level); + globalLogger = logger; + ProcessSync(logger); + FileSync.initialize(logger); + logger.log( + 'db: ${configuration.asString('db', section: 'db')}', LEVEL_DETAIL); + } + + /// Initialisiert eine Isolate-Instanz. + Future initIsolate() async { + Completer completer = Completer(); + var isolateToMainStream = ReceivePort(); + + isolateToMainStream.listen((data) { + if (data is SendPort) { + var mainToIsolateStream = data; + completer.complete(mainToIsolateStream); + } + }); + await Isolate.spawn(runIsolate, isolateToMainStream.sendPort); + return completer.future as dynamic; + } + + /// Startet die Isolate-Instanzen des Webservers. + void run() async { + final cpus = Platform.numberOfProcessors; + var count = configuration.asInt('threads', section: 'service') ?? cpus; + count = count <= 0 ? 1 : count; + for (var ix = 1; ix <= count; ix++) { + var mainToIsolateStream = await initIsolate(); + mainToIsolateStream.send( + WorkerParameters(ix + 1, configuration, serviceName).asString()); + } + final mainToIsolateStream = await initIsolate(); + // der letzte Worker ist der Observer: + mainToIsolateStream + .send(WorkerParameters(1, configuration, serviceName).asString()); + } + + /// Starts the daemon process. + /// [args] contains the program arguments service name and user, + /// e.g. ['pollsam', 'pollsam']. + /// [results] contains the options. + static void daemon(List args, ArgResults results) { + final service = args.isEmpty ? 'pollsam' : args[0]; + var filename = results['configuration']; + if (filename == '/etc/pollsam/pollsam.yaml' && service != 'pollsam') { + filename = '/etc/pollsam/$service.yaml'; + } + if (!File(filename).existsSync()) { + print('+++ missing $filename'); + unittestLogger?.error('missing configuration: $filename'); + } else { + final server = RestServer.fromConfig(filename, serviceName: service); + server.run(); + } + } + + static String exampleConfiguration() { + final rc = '''--- +# Example configuration file for the rest server. Created by rest_server. +service: + address: 0.0.0.0 + port: 58021 + dataDirectory: /var/cache/rest_server/data + threads: 2 + watchDogPause: 60 + # logFile: /var/log/local/pollsam.log +trace: + answerLength: 200 +db: + db: apppolladm + user: jonny + code: "Top Secret" + host: localhost + port: 3306 + timeout: 30 + traceDataLength: 200 +clientSessionTimeout: 900 +# put this content to /etc/pollsam/pollsam.yaml +'''; + return rc; + } + + /// Installs the application as systemd service. + static void install(List args) { + final logger = globalLogger; + final service = OsService(logger); + final userInfo = UserInfo(); + final processSync = ProcessSync.initialize(logger); + final fileSync = FileSync.initialize(logger); + if (!userInfo.isRoot) { + logger.error('Be root!'); + } else { + final appName = args.length >= 2 ? args[1] : 'pollsam'; + var executable = + path.absolute(args.length > 1 ? args[0] : Platform.executable); + if (!executable.startsWith(Platform.pathSeparator)) { + executable = processSync.executeToString('which', [executable]).trim(); + } + final fnConfig = '/etc/pollsam/$appName.yaml'; + fileSync.ensureDirectory(path.dirname(fnConfig)); + if (File(fnConfig).existsSync()) { + logger.log('= $fnConfig already exists.'); + } else { + logger.log('= creating $fnConfig'); + fileSync.toFile(fnConfig, exampleConfiguration()); + } + service.installService( + appName, + starter: executable, + user: appName, + group: appName, + description: 'A REST server serving the POLLECTOR project', + ); + } + } + + /// Uninstalls the application as systemd service. + static void uninstall(List args) { + final logger = BaseLogger(LEVEL_DETAIL); + ProcessSync.initialize(logger); + final appName = args.isEmpty ? 'pollsam' : args[0]; + final service = OsService(logger); + service.uninstallService(appName, user: appName, group: appName); + } + + static String version() => _version; +} + +/// Datenklasse für eine Isolate-Instanz. +class ServiceWorker { + /// for unittests: + static int maxRequests = 0; + String nextBody = ''; + final BaseConfiguration configuration; + final int threadId; + final String serviceName; + int clientSessionTimeout = 30; + BaseLogger logger = globalLogger; + bool requestedCodecIsLatin1 = false; + HttpRequest? currentRequest; + String what = ''; + MySqlDb? db; + String restVersion = ''; + FileSync? _fileSync = FileSync(); + + ServiceWorker(this.threadId, this.configuration, this.serviceName) { + final fnLog = '/var/log/local/$serviceName.$threadId.log'; + logger = RestServer.unittestLogger ?? + Logger(fnLog, + configuration.asInt('logLevel', section: 'service') ?? LEVEL_FINE); + logger.log('db: ${configuration.asString('db', section: 'db')}'); + db = MySqlDb.fromConfiguration(configuration, logger); + clientSessionTimeout = configuration.asInt('clientSessionTimeout') ?? 30; + _fileSync = FileSync(logger); + } + + /// Checks whether a valid connection is available. If not a reconnection is + /// done. + Future checkConnection() async { + var ready = false; + final sql = 'select count(*) from loginusers;'; + int? count; + try { + count = await db!.readOneInt(sql); + } catch (exc) { + logger.error(exc.toString()); + ready = true; + } + if (!ready && (count ?? 0) <= 0) { + logger.log('Datenbank wird neu verbunden', LEVEL_DETAIL); + db = MySqlDb.fromConfiguration(configuration, logger); + await db?.connect(); + } + } + + /// Prüft die Session-ID. + /// Liefert true, wenn OK, false sonst. + Future checkSession(Map parameters) async { + var rc = false; + if (parameters.containsKey('sessionid')) { + final id = parameters['sessionid']; + final sql = '''SELECT + connection_id, connection_apiaristid, connection_start +FROM connections +WHERE + connection_name=? + AND connection_start >= NOW() - INTERVAL $clientSessionTimeout SECOND +; +'''; + final record = await db!.readOneAsMap(sql, params: [id]); + if (record == null) { + logger.log('ungültige Session: $id', LEVEL_DETAIL); + } else { + logger.log( + 'Session: $id apiarist: ${record['connection_apiaristid']} start: ${record['connection_start']}', + LEVEL_DETAIL); + rc = true; + final sql2 = '''UPDATE connections SET + connection_requests=connection_requests+1, + connection_end=NOW() +WHERE + connection_name=? +; +'''; + await db!.updateOne(sql2, params: [id]); + } + } + return rc; + } + + /// Returns the content of a POST request as map. + Future contentAsMap() async { + var content = await contentOf(); + Map map; + if (content.isEmpty) { + map = {}; + } else { + try { + map = convert.jsonDecode(content) as Map; + } on FormatException catch (exc) { + logger.error('json decoding failed:\ncontent\n$exc'); + rethrow; + } + } + return map; + } + + /// Fetches the content of the POST request as String. + Future contentOf() async { + String content; + if (nextBody.isNotEmpty) { + content = nextBody; + } else { + var data = requestedCodecIsLatin1 + ? convert.latin1.decoder.bind(currentRequest!) + : convert.utf8.decoder.bind(currentRequest!); + content = await data.join(); + } + return content; + } + + /// Returns the content of the file [node] in the [directory] + /// or null, if that file does not exist. + Future getFile(String directory, String node) async { + final fn = path.join(directory, node); + final file = File(fn); + final rc = await file.exists() ? null : await file.readAsString(); + return rc; + } + + /// Bearbeitet die Anforderung 'hives': + /// Liefert Map mit Bienenstockinfo oder [forbidden], wenn Session-ID unbekannt. + Future queryData(Map parameters) async { + String rc; + final sql = '''SELECT * +FROM hives +WHERE hive_apiaryid=( + SELECT apiarist_apiaryid FROM apiarists + WHERE apiarist_id=( + SELECT connection_apiaristid FROM connections + WHERE connection_name=?) + ) +ORDER BY hive_name +; +'''; + final params = [parameters['sessionid']]; + final records = await db!.readAll(sql, params: params); + if (records == null) { + rc = wrongData; + } else { + final list = >[]; + records.forEach((record) => list.add({ + 'hiveid': record['hive_id'], + 'name': record['hive_name'], + 'lat': record['hive_latitude'], + 'long': record['hive_longitude'] + })); + rc = convert.jsonEncode({'list': list}); + } + return rc; + } + + /// Bearbeitet die Anforderung 'sessionid': + /// Prüfen der Lizenz ("Token"). Wenn erfolgreich, wird die Session + /// in die DB eingetragen. + /// Liefert eine JSon-Map (als String) oder null, wenn Fehler. + Future getSessionId(Map parameters) async { + String? rc; + var sql = '''SELECT apiarist_id +FROM apiarists +WHERE apiarist_token=?; +'''; + final token = parameters['token']; + final params = [token]; + final apiaristId = await db!.readOneInt(sql, params: params); + if (apiaristId == null) { + logger.log('unknown token: $token', LEVEL_DETAIL); + } else { + sql = '''INSERT + INTO connections + (connection_name, connection_apiaristid, connection_start, connection_end, connection_requests) + VALUES (?, ?, NOW(), NOW(), 0); + '''; + + /// Use only 31 bit (non negativ numbers on 32 bit clients): + final sessionId = int.parse( + buildMd5Hash( + token + DateTime.now().microsecondsSinceEpoch.toString()) + .substring(0, 8), + radix: 16) & + 0x7fffffff; + final params2 = [sessionId, apiaristId]; + final id = await db!.insertOne(sql, params: params2); + if (id <= 0) { + logger.error('insert into connection failed: $token'); + } else { + logger.log('sessionid: $sessionId', LEVEL_DETAIL); + rc = convert.jsonEncode({'sessionid': sessionId}); + } + } + return rc; + } + + /// Liefert zu einem [name] das SQL-Statement. + /// [paramCount] ist die Anzahl der Parameter. Dient zur Konsistenzprüfung. + /// Liefert null oder das SQL-Statement + String? getSql(String name, int paramCount) { + String? sql; + var expectedCount = 0; + switch (name) { + case 'hives': + sql = '''SELECT + * +FROM hives +WHERE hive_apiaryid=:id +'''; + expectedCount = 1; + break; + case 'apiaristByToken': + sql = '''SELECT + * +FROM apiary +WHERE + apiarist_token=:token +'''; + expectedCount = 1; + break; + default: + logger.error('unknown SQL name: $name'); + break; + } + if (expectedCount != paramCount) { + logger.error( + 'unexpected parameter count in $name: $paramCount instead of $expectedCount'); + } + return sql; + } + + /// Bearbeitet die Anforderung 'register': + /// Test, ob der Name in den Parametern bei den Imkern existiert. + /// Wenn ja, wird der Token geliefert, sonst null. + Future getToken(Map parameters) async { + String rc; + final sql = '''SELECT apiarist_token +FROM apiarists +WHERE apiarist_registername=?; +'''; + final name = parameters['name']; + final params = [name]; + final token = await db!.readOneString(sql, params: params); + if (token == null) { + logger.log('unknown name: $name', LEVEL_DETAIL); + rc = wrongData; + } else { + rc = convert.jsonEncode({'token': token}); + } + return rc; + } + + /// Handles a POST request. + /// Extracts the request name and calls the matching method. + /// Returns the answer that should be sent to the client: A string or a map. + Future handlePost() async { + final parameters = stripDelimiters(await contentAsMap()); + var withSession = false; + String? rc; + if (what != 'watchdog') { + await checkConnection(); + } + try { + if (withSession && what != 'sessionid' && + what != 'register' && + what != 'watchdog' && + !await checkSession(parameters)) { + rc = forbidden; + } else { + switch (what) { + case 'watchdog': + rc = await watchdog(parameters); + break; + case 'sessionid': + rc = await getSessionId(parameters); + break; + case 'store': + rc = await storeData(parameters); + break; + case 'query': + rc = await queryData(parameters); + break; + case 'register': + rc = await getToken(parameters); + break; + default: + logger.error('unknown request: $what'); + break; + } + } + } on Exception catch (exc) { + logger.error('exception thrown: $exc'); + } + return rc; + } + + /// Waits for requests of the [server] and calls the handlers. + Future handleRequest(HttpRequest request) async { + currentRequest = request; + try { + switch (request.method) { + case 'POST': + prepareResource(); + final answer = await handlePost(); + await writeAnswer(answer, request); + break; + default: + logger.log('unexpected METHOD: ${request.method}', LEVEL_FINE); + break; + } + } on Error catch (exc2, trace) { + logger.error('$exc2', stackTrace: trace); + } on Exception catch (exc, trace) { + logger.error('$exc', stackTrace: trace); + } + } + + /// Does asynchronous initializations. + /// @precondition: Must be called after the constructor! + Future initAsync() async { + await db!.connect(); + } + + /// Connects to a listening address/port and waits for requests. + Future listen() async { + var doService = true; + final port = configuration.asInt('port', section: 'service') ?? 58011; + final address = configuration.asString('address', + section: 'service', defaultValue: '0.0.0.0'); + + try { + await initAsync(); + var server = await HttpServer.bind(address, port, shared: true); + logger.log('[$threadId]: Listening on $address:$port'); + var requestNo = 0; + if (doService) { + await for (HttpRequest request in server) { + await handleRequest(request); + await request.response.close(); + if (!doService || maxRequests > 0 && ++requestNo > maxRequests) { + break; + } + } + } + } on Exception catch (exc) { + logger.error(exc.toString()); + } + logger.log('thread $threadId stopped'); + } + + /// Prüft, ob ein Isolate auf eine Watchdog-Anfrage antwortet. + /// Hintergrund: Wenn eine DB-Verbindung abbricht, tritt beim nächsten + /// DB-Zugriff eine SocketException auf, die nicht abgefangen werden kann. + /// Der Thread (Isolate) ist dann tot. + /// Daher wird vom Observer-Thread regelmäßig eine Anfrage verschickt. + /// Kommt keine Antwort, ist kein Empfangsthread mehr erreichbar, das Programm + /// wird mittels exit() abgebrochen, damit es von SystemD erneut gestartet + /// wird. + Future observe() async { + final duration = Duration( + seconds: + configuration.asInt('watchDogPause', section: 'service') ?? 60); + logger.log('watchdog pause: ${duration.inSeconds}', LEVEL_DETAIL); + var counter = 0; + while (true) { + await Future.delayed(duration); + final answer = await runRequest('watchdog', body: '{"wait": 1}'); + if (++counter == 1) { + logger.log('first watchdog request: $answer', LEVEL_DETAIL); + } + if (answer != 'OK') { + logger.error('watchdog failed: aborting! [$answer]'); + // Wir beenden das Programm, damit es von SystemD neu gestartet wird. + exit(1); + } + } + } + + /// Ermittelt die Anfragenklasse ("what"), die Teil der URI ist. + void prepareResource({bool withId = true}) { + if (!(currentRequest?.requestedUri.path.contains('watchdog') ?? false)) { + logger.log(currentRequest!.requestedUri.path, LEVEL_FINE); + } + var list = currentRequest!.requestedUri.path.split('/'); + if (list.isNotEmpty && list[0].isEmpty) { + list = list.sublist(1); + } + // URI: // + if (list.length < 2) { + throw Exception('illegal request URI: ${list.join("/")}'); + } + what = list[0]; + restVersion = list[1]; + } + + /// Fordert per POST eine Aktion an. + /// [what] legt die Aktion fest. + /// [body] ist der Text, der mit der Anfrage mitgeliefert wird. + /// [headers] sind HTTP-Headerkomponenten. + /// Liefert die Antwort der Anfrage. + Future runRequest(String what, + {String? body, Map? headers}) async { + final port = configuration.asInt('port', section: 'service') ?? 58011; + var rc = ''; + final uri = + Uri(scheme: 'http', host: 'localhost', port: port, path: '/$what/1.0'); + http.Response response; + // logger.log('request: POST $uri', LEVEL_LOOP); + try { + response = await http.post(uri, body: body, headers: headers); + // logger.log('status: ${response.statusCode}', LEVEL_LOOP); + if (response.statusCode != 200) { + logger.error('$uri: status: ${response.statusCode}'); + } else { + rc = response.body; + } + } on SocketException catch (exc) { + logger.error('$exc'); + rc = '+ $exc'; + } on Exception catch (exc) { + logger.error('$exc'); + } + return rc; + } + + void setRequestData(String what, String restVersion) { + this.what = what; + this.restVersion = restVersion; + } + + /// Speichert [content] im Verzeichnis [directory] unter dem Namen [node]. + Future storeFile(String directory, String node, String content) async { + if (directory.isEmpty) { + logger.error('storeFile: missing data directory'); + } else { + _fileSync!.ensureDirectory(directory); + final fn = path.join(directory, node); + await File(fn).writeAsString(content); + } + } + + /// Bearbeitet die Anforderung 'sample': Speichern einer Probe. + Future storeData(Map parameters) async { + String rc; + if (!testParam( + ['module', 'sql', 'sampleid', 'sessionid'], parameters, 'storeData')) { + rc = wrongParameters; + } else { + + final sql = '''INSERT + INTO samples + (sample_code, sample_timestamp, sample_hiveid, sample_uuid, sample_connection, + sample_apiaristid, sample_raw, sample_encoding, created, createdby) + VALUES (?, ?, ?, ?, ?, + (SELECT MAX(connection_apiaristid) FROM connections WHERE connection_name=?), + ?, ?, NOW(), 'POLLSAM'); + '''; + final connection = parameters['sessionid']; + final params = [ + parameters['sampleid'], + parameters['time'], + parameters['hiveid'], + parameters['uuid'], + connection, + connection, + parameters.containsKey('raw') ? parameters['raw'] : null, + parameters.containsKey('encoding') ? parameters['encoding'] : null, + ]; + final id = await db!.insertOne(sql, params: params); + if (id <= 0) { + logger.error('insert into samples failed: ${parameters['uuid']}'); + } else { + logger.log('sample: $id', LEVEL_DETAIL); + } + final hive = parameters['hiveid'].toString(); + final data = { + 'uuid': parameters['uuid'], + 'time': parameters['time'], + 'sampleid': parameters['sampleid'], + 'hiveid': hive, + }; + final content = convert.jsonEncode(data); + await storeFile( + configuration.asString( + 'dataDirectory', + section: 'service', + ) ?? + '', + parameters['uuid'] as String, + content); + rc = 'OK'; + } + return rc; + } + + /// Liefert eine Kopie von [source] mit Schlüsseln und Werten ohne Delimiter. + /// Beispiel: { "'a'": '"1"' } => { "a": '1' } + Map stripDelimiters(Map source) { + final rc = {}; + for (var key in source.keys) { + var value = source[key]; + final valueLength = value is String ? value.length : 0; + final value2 = valueLength == 0 ? null : (value as String); + if (valueLength > 0 && + value2 != null && + (value2.startsWith('"') || value2.startsWith("'")) && + value2.endsWith(value2[0])) { + value = value2.substring(1, value2.length - 1); + } + if ((key.startsWith('"') || key.startsWith("'")) && + key.length > 2 && + key.endsWith(key[0])) { + key = key.substring(1, key.length - 1); + } + rc[key] = value; + } + return rc; + } + + /// Tests, whether the [map] contains given [keys]. + /// Return true, if all keys are available. + bool testParam(List keys, Map map, String caller) { + var rc = true; + for (var key in keys) { + if (!map.containsKey(key)) { + logger.error('missing key "$key" in $caller'); + rc = false; + } + } + return rc; + } + + /// Returns 'OK' after a given timeout. + /// The timeout is used to request other instances (isolates) (listening on + /// shared ports). + /// Returns 'OK' after a given timeout. + Future watchdog(Map parameters) async { + final rc = 'OK'; + final wait = parameters['wait']; + await Future.delayed(Duration(seconds: wait), () => '1'); + return rc; + } + + /// Writes an [answer] to the client via [request]. + Future writeAnswer(answer, request) async { + if (answer != null) { + if (answer is Iterable || answer is Map) { + answer = convert.jsonEncode(answer); + } + if (logger.logLevel >= LEVEL_FINE && + !request.requestedUri.path.contains('watchdog')) { + logger.log('[$threadId]: answer: ' + + limitString( + answer, configuration.asInt('answerLength', section: 'trace'))); + } + // Preventing "XMLHttpRequest error" + // do not use with credentials! + request.response.headers.add('Access-Control-Allow-Origin', '*'); + request.response + ..statusCode = HttpStatus.ok + ..write(answer); + await request.response.close(); + } + } +} + +/// Data class for storage of configuration data of a [ServiceWorker]. +class WorkerParameters { + int id; + BaseConfiguration configuration; + String serviceName; + + WorkerParameters(this.id, this.configuration, this.serviceName); + + /// Decodes a string built with asString() into an instance. + factory WorkerParameters.fromString(String data) { + final parts = data.split('\t'); + const baseService = 2; + const baseTrace = baseService + 4; + const baseDb = baseTrace + 1; + const baseRest = baseDb + 7; + final configuration = BaseConfiguration({ + 'service': { + 'address': parts[baseService], + 'port': parts[baseService + 1], + 'watchDogPause': parts[baseService + 2], + 'dataDirectory': parts[baseService + 3] + }, + 'trace': { + 'answerLength': int.parse(parts[baseTrace]), + }, + 'db': { + 'db': parts[baseDb], + 'user': parts[baseDb + 1], + 'code': parts[baseDb + 2], + 'host': parts[baseDb + 3], + 'port': int.parse(parts[baseDb + 4]), + 'timeout': int.parse(parts[baseDb + 5]), + 'traceDataLength': int.parse(parts[baseDb + 6]), + }, + 'clientSessionTimeout': int.parse(parts[baseRest]), + }, MemoryLogger()); + return WorkerParameters( + int.parse(parts[1]), configuration, parts[baseRest + 1]); + } + + /// Encodes the relevant instance data into a string. + /// Decode that with fromString(). + String asString() { + const section = 'service'; + const section2 = 'trace'; + const section3 = 'db'; + final rc = [ + 'WorkerParameters', + id.toString(), + configuration.asString('address', section: section) ?? 'localhost', + (configuration.asInt('port', section: section) ?? 58011).toString(), + (configuration.asInt('watchDogPause', section: section) ?? 60).toString(), + configuration.asString('dataDirectory', section: section) ?? 'data', + configuration + .asInt('answerLength', section: section2, defaultValue: 200) + .toString(), + configuration.asString('db', section: section3) ?? 'apppolladm', + configuration.asString('user', section: section3) ?? 'apppolladm', + configuration.asString('code', section: section3) ?? 'TopSecret', + configuration.asString('host', section: section3) ?? 'localhost', + configuration + .asInt('port', section: section3, defaultValue: 3306) + .toString(), + configuration + .asInt('timeout', section: section3, defaultValue: 30) + .toString(), + configuration + .asInt('traceDataLength', section: section3, defaultValue: 3306) + .toString(), + configuration.asInt('clientSessionTimeout', defaultValue: 900).toString(), + serviceName, + ].join('\t'); + return rc; + } +} +enum SqlStatementType { + insert, list, query, update, delete +} +class SqlException extends FormatException{ + final String data; + SqlException(this.data); + @override + String toString() => 'SqlException: $data'; +} +class SqlStatement { + final String name; + final List parameters; + final String sql; + final SqlStatementType type; + final SqlModule parent; + SqlStatement(this.name, this.parameters, this.sql, this.type, this.parent); + /// Returns a SQL statement and a parameter list (positional parameters). + /// [map]: the current parameters as named parameters: : + /// [parameters]: OUT the positional parameters + String sqlStatement(Map map, List parameters){ + for (var parameter in parameters){ + if (! map.containsKey(parameter)) { + throw SqlException('${toString()}: missing parameter "$parameter"'); + } else { + parameters.add(map[parameter]); + } + } + return sql; + } + @override + String toString(){ + String rc = '${parent.name}.$name'; + return rc; + } +} +class SqlModule{ + final String name; + final Map sqlStatements = {}; + final SqlStorage parent; + SqlModule(this.name, this.parent); + /// Adds a statement to the map. + void add(SqlStatement statement) { + if (sqlStatements.containsKey(statement.name)){ + throw SqlException('module $name contains already a statement "${statement.name}"'); + } + sqlStatements[statement.name] = statement; + } +} +class SqlStorage{ + final BaseLogger logger; + Map modules = {}; + SqlStorage(this.logger); + void readModule(Map map, String filename){ + String moduleName = ''; + if (map.containsKey('module')){ + moduleName = map['module']; + } else { + logger.error('$filename: missing "module"'); + } + if (! modules.containsKey(moduleName)){ + modules[moduleName] = SqlModule(moduleName, this); + } + final module = modules[moduleName]; + for (var name in map.keys){ + switch(name){ + case 'module': + // already done. + break; + default: + final map2 = map[name]; + if (map2 is! Map){ + logger.error('$filename: "$name" is not a map'); + } else if (! map2.containsKey('type')){ + logger.error('$filename: "$name": missing type'); + } else if (! map2.containsKey('parameters')){ + logger.error('$filename: "$name": missing parameters'); + } else if (! map2.containsKey('sql')){ + logger.error('$filename: "$name": missing sql'); + } + final type = map2['type']; + final parameters = map2['parameters']; + final sql = map2['sql']; + if (type is! String){ + logger.error('$filename: "$name": type is not a string'); + } else if (parameters is! Iterable){ + logger.error('$filename: "$name": type is not an array'); + } else if (sql is! String){ + logger.error('$filename: "$name": type is not a string'); + } else { + final parameters2 = []; + int no = -1; + for (var item in parameters){ + no++; + if (item is! String){ + logger.error('$filename: "$name": parameter[$no] is not a string'); + } else { + + } + } + var type2 = stringToEnum(type, SqlStatementType.values); + if (type2 == null){ + logger.error('$filename: "$name": unknown type: $type. Using "query"'); + type2 = SqlStatementType.query; + } + modules[moduleName]!.add(SqlStatement(name, parameters2, sql, type2, module!)); + } + } + } + } +} diff --git a/rest_server/lib/services.dart b/rest_server/lib/services.dart new file mode 100644 index 0000000..da5a256 --- /dev/null +++ b/rest_server/lib/services.dart @@ -0,0 +1,79 @@ +import 'rest_server.dart'; + +import 'package:args/args.dart'; + +/// Parses the program arguments and executes the services. +Future run(List args) async { + var rc = 0; + final parser = ArgParser(); + parser.addFlag('help', + abbr: 'h', negatable: false, help: 'Print this message.'); + parser.addFlag('example', + abbr: 'e', negatable: false, help: 'Displays a example configuration.'); + parser.addOption('configuration', + abbr: 'c', + help: 'The configuration file.', + defaultsTo: '/etc/exhibition/rest_server.yaml'); + try { + final results = parser.parse(args); + if (results['help']) { + try { + try { + print('''Usage rest_server [mode] [options] + Starts a REST server answering the client database requests. +: + daemon + Starts the REST server. This is the default behaviour. + install [ []] + Installs the program. + uninstall [] + Uninstalls the program. + version + Displays the version. +