--- /dev/null
+import 'dart:async';
+import 'dart:convert' as convert;
+import 'dart:io';
+import 'dart:isolate';
+
+import 'package:dart_bones/dart_bones.dart';
+import 'package:http/http.dart' as http;
+import 'package:crypto/crypto.dart';
+import 'package:path/path.dart' as path;
+import 'package:args/args.dart';
+
+const forbidden = 'forbidden';
+const wrongData = 'wrong data';
+const wrongParameters = 'wrong parameters';
+
+/// Returns the MD5 hash of [text].
+String buildMd5Hash(String text) {
+ final rc = md5.convert(convert.utf8.encode(text)).toString();
+ return rc;
+}
+
+/// Starts an isolate instance with the channel [isolateToMainStream].
+void runIsolate(SendPort isolateToMainStream) {
+ var mainToIsolateStream = ReceivePort();
+ isolateToMainStream.send(mainToIsolateStream.sendPort);
+
+ mainToIsolateStream.listen((data) {
+ if (data is String && data.startsWith('WorkerParameter')) {
+ final params = WorkerParameters.fromString(data);
+ final worker =
+ ServiceWorker(params.id, params.configuration, params.serviceName);
+ if (params.id == 1) {
+ worker.observe();
+ } else {
+ worker.listen();
+ }
+ }
+ });
+
+ isolateToMainStream.send('This is from myIsolate()');
+}
+
+/// Implements a modified REST server.
+/// That is a HTTP server serving only the method POST (for queries and storages).
+/// The server is multi threaded by isolates.
+class RestServer {
+ static const _version = 'V2021.06.29.00';
+ static BaseLogger? unittestLogger;
+ BaseLogger logger = MemoryLogger(LEVEL_FINE);
+ BaseConfiguration configuration = BaseConfiguration({}, globalLogger);
+ var clientSessionTimeout = 15 * 60;
+ String serviceName = 'exhibition';
+ RestServer(
+ int port,
+ this.logger, {
+ String address = 'localhost',
+ int sessionTimeout = 300,
+ int answerDumpLength = 80,
+ int dataDumpLength = 200,
+ String db = 'appexhibition',
+ String dbUser = 'exhibition',
+ String dbCode = 'TopSecret',
+ String dbHost = 'localhost',
+ int dbPort = 3306,
+ int watchDogPause = 60,
+ int traceDataLength = 80,
+ clientSessionTimeout = 15 * 60,
+ String dataDirectory = 'data',
+ }) {
+ if (unittestLogger != null) {
+ logger = unittestLogger!;
+ }
+ configuration = BaseConfiguration({
+ 'service': {
+ 'port': port,
+ 'watchDogPause': watchDogPause,
+ 'address': address,
+ 'dataDirectory': dataDirectory,
+ 'threads': Platform.numberOfProcessors,
+ },
+ 'trace': {
+ 'answerLength': answerDumpLength,
+ 'sqlLength': dataDumpLength,
+ },
+ 'db': {
+ 'db': db,
+ 'user': dbUser,
+ 'code': dbCode,
+ 'host': dbHost,
+ 'port': dbPort,
+ 'traceDataLength': traceDataLength,
+ },
+ 'clientSessionTimeout': clientSessionTimeout,
+ }, logger);
+ }
+
+ /// Konstruktor, der die Parameter aus der YAML-Konfigurationsdatei
+ /// [filename] bezieht.
+ ///
+ RestServer.fromConfig(String filename, {this.serviceName = 'exhibition'}) {
+ final logger2 = unittestLogger ?? MemoryLogger();
+ configuration = Configuration.fromFile(filename, logger2);
+ final logFile = configuration.asString('logFile',
+ section: 'service',
+ defaultValue: '/var/log/local/$serviceName.log') ??
+ '';
+ final level =
+ configuration.asInt('logLevel', section: 'service') ?? LEVEL_SUMMERY;
+ logger = unittestLogger ?? Logger(logFile, level);
+ globalLogger = logger;
+ ProcessSync(logger);
+ FileSync.initialize(logger);
+ logger.log(
+ 'db: ${configuration.asString('db', section: 'db')}', LEVEL_DETAIL);
+ }
+
+ /// Initialisiert eine Isolate-Instanz.
+ Future<SendPort> initIsolate() async {
+ Completer completer = Completer<SendPort>();
+ var isolateToMainStream = ReceivePort();
+
+ isolateToMainStream.listen((data) {
+ if (data is SendPort) {
+ var mainToIsolateStream = data;
+ completer.complete(mainToIsolateStream);
+ }
+ });
+ await Isolate.spawn(runIsolate, isolateToMainStream.sendPort);
+ return completer.future as dynamic;
+ }
+
+ /// Startet die Isolate-Instanzen des Webservers.
+ void run() async {
+ final cpus = Platform.numberOfProcessors;
+ var count = configuration.asInt('threads', section: 'service') ?? cpus;
+ count = count <= 0 ? 1 : count;
+ for (var ix = 1; ix <= count; ix++) {
+ var mainToIsolateStream = await initIsolate();
+ mainToIsolateStream.send(
+ WorkerParameters(ix + 1, configuration, serviceName).asString());
+ }
+ final mainToIsolateStream = await initIsolate();
+ // der letzte Worker ist der Observer:
+ mainToIsolateStream
+ .send(WorkerParameters(1, configuration, serviceName).asString());
+ }
+
+ /// Starts the daemon process.
+ /// [args] contains the program arguments service name and user,
+ /// e.g. ['pollsam', 'pollsam'].
+ /// [results] contains the options.
+ static void daemon(List<String> args, ArgResults results) {
+ final service = args.isEmpty ? 'pollsam' : args[0];
+ var filename = results['configuration'];
+ if (filename == '/etc/pollsam/pollsam.yaml' && service != 'pollsam') {
+ filename = '/etc/pollsam/$service.yaml';
+ }
+ if (!File(filename).existsSync()) {
+ print('+++ missing $filename');
+ unittestLogger?.error('missing configuration: $filename');
+ } else {
+ final server = RestServer.fromConfig(filename, serviceName: service);
+ server.run();
+ }
+ }
+
+ static String exampleConfiguration() {
+ final rc = '''---
+# Example configuration file for the rest server. Created by rest_server.
+service:
+ address: 0.0.0.0
+ port: 58021
+ dataDirectory: /var/cache/rest_server/data
+ threads: 2
+ watchDogPause: 60
+ # logFile: /var/log/local/pollsam.log
+trace:
+ answerLength: 200
+db:
+ db: apppolladm
+ user: jonny
+ code: "Top Secret"
+ host: localhost
+ port: 3306
+ timeout: 30
+ traceDataLength: 200
+clientSessionTimeout: 900
+# put this content to /etc/pollsam/pollsam.yaml
+''';
+ return rc;
+ }
+
+ /// Installs the application as systemd service.
+ static void install(List<String> args) {
+ final logger = globalLogger;
+ final service = OsService(logger);
+ final userInfo = UserInfo();
+ final processSync = ProcessSync.initialize(logger);
+ final fileSync = FileSync.initialize(logger);
+ if (!userInfo.isRoot) {
+ logger.error('Be root!');
+ } else {
+ final appName = args.length >= 2 ? args[1] : 'pollsam';
+ var executable =
+ path.absolute(args.length > 1 ? args[0] : Platform.executable);
+ if (!executable.startsWith(Platform.pathSeparator)) {
+ executable = processSync.executeToString('which', [executable]).trim();
+ }
+ final fnConfig = '/etc/pollsam/$appName.yaml';
+ fileSync.ensureDirectory(path.dirname(fnConfig));
+ if (File(fnConfig).existsSync()) {
+ logger.log('= $fnConfig already exists.');
+ } else {
+ logger.log('= creating $fnConfig');
+ fileSync.toFile(fnConfig, exampleConfiguration());
+ }
+ service.installService(
+ appName,
+ starter: executable,
+ user: appName,
+ group: appName,
+ description: 'A REST server serving the POLLECTOR project',
+ );
+ }
+ }
+
+ /// Uninstalls the application as systemd service.
+ static void uninstall(List<String> args) {
+ final logger = BaseLogger(LEVEL_DETAIL);
+ ProcessSync.initialize(logger);
+ final appName = args.isEmpty ? 'pollsam' : args[0];
+ final service = OsService(logger);
+ service.uninstallService(appName, user: appName, group: appName);
+ }
+
+ static String version() => _version;
+}
+
+/// Datenklasse für eine Isolate-Instanz.
+class ServiceWorker {
+ /// for unittests:
+ static int maxRequests = 0;
+ String nextBody = '';
+ final BaseConfiguration configuration;
+ final int threadId;
+ final String serviceName;
+ int clientSessionTimeout = 30;
+ BaseLogger logger = globalLogger;
+ bool requestedCodecIsLatin1 = false;
+ HttpRequest? currentRequest;
+ String what = '';
+ MySqlDb? db;
+ String restVersion = '';
+ FileSync? _fileSync = FileSync();
+
+ ServiceWorker(this.threadId, this.configuration, this.serviceName) {
+ final fnLog = '/var/log/local/$serviceName.$threadId.log';
+ logger = RestServer.unittestLogger ??
+ Logger(fnLog,
+ configuration.asInt('logLevel', section: 'service') ?? LEVEL_FINE);
+ logger.log('db: ${configuration.asString('db', section: 'db')}');
+ db = MySqlDb.fromConfiguration(configuration, logger);
+ clientSessionTimeout = configuration.asInt('clientSessionTimeout') ?? 30;
+ _fileSync = FileSync(logger);
+ }
+
+ /// Checks whether a valid connection is available. If not a reconnection is
+ /// done.
+ Future checkConnection() async {
+ var ready = false;
+ final sql = 'select count(*) from loginusers;';
+ int? count;
+ try {
+ count = await db!.readOneInt(sql);
+ } catch (exc) {
+ logger.error(exc.toString());
+ ready = true;
+ }
+ if (!ready && (count ?? 0) <= 0) {
+ logger.log('Datenbank wird neu verbunden', LEVEL_DETAIL);
+ db = MySqlDb.fromConfiguration(configuration, logger);
+ await db?.connect();
+ }
+ }
+
+ /// Prüft die Session-ID.
+ /// Liefert true, wenn OK, false sonst.
+ Future<bool> checkSession(Map<String, dynamic> parameters) async {
+ var rc = false;
+ if (parameters.containsKey('sessionid')) {
+ final id = parameters['sessionid'];
+ final sql = '''SELECT
+ connection_id, connection_apiaristid, connection_start
+FROM connections
+WHERE
+ connection_name=?
+ AND connection_start >= NOW() - INTERVAL $clientSessionTimeout SECOND
+;
+''';
+ final record = await db!.readOneAsMap(sql, params: [id]);
+ if (record == null) {
+ logger.log('ungültige Session: $id', LEVEL_DETAIL);
+ } else {
+ logger.log(
+ 'Session: $id apiarist: ${record['connection_apiaristid']} start: ${record['connection_start']}',
+ LEVEL_DETAIL);
+ rc = true;
+ final sql2 = '''UPDATE connections SET
+ connection_requests=connection_requests+1,
+ connection_end=NOW()
+WHERE
+ connection_name=?
+;
+''';
+ await db!.updateOne(sql2, params: [id]);
+ }
+ }
+ return rc;
+ }
+
+ /// Returns the content of a POST request as map.
+ Future<dynamic> contentAsMap() async {
+ var content = await contentOf();
+ Map map;
+ if (content.isEmpty) {
+ map = {};
+ } else {
+ try {
+ map = convert.jsonDecode(content) as Map;
+ } on FormatException catch (exc) {
+ logger.error('json decoding failed:\ncontent\n$exc');
+ rethrow;
+ }
+ }
+ return map;
+ }
+
+ /// Fetches the content of the POST request as String.
+ Future<String> contentOf() async {
+ String content;
+ if (nextBody.isNotEmpty) {
+ content = nextBody;
+ } else {
+ var data = requestedCodecIsLatin1
+ ? convert.latin1.decoder.bind(currentRequest!)
+ : convert.utf8.decoder.bind(currentRequest!);
+ content = await data.join();
+ }
+ return content;
+ }
+
+ /// Returns the content of the file [node] in the [directory]
+ /// or null, if that file does not exist.
+ Future<String?> getFile(String directory, String node) async {
+ final fn = path.join(directory, node);
+ final file = File(fn);
+ final rc = await file.exists() ? null : await file.readAsString();
+ return rc;
+ }
+
+ /// Bearbeitet die Anforderung 'hives':
+ /// Liefert Map mit Bienenstockinfo oder [forbidden], wenn Session-ID unbekannt.
+ Future<String> queryData(Map<String, dynamic> parameters) async {
+ String rc;
+ final sql = '''SELECT *
+FROM hives
+WHERE hive_apiaryid=(
+ SELECT apiarist_apiaryid FROM apiarists
+ WHERE apiarist_id=(
+ SELECT connection_apiaristid FROM connections
+ WHERE connection_name=?)
+ )
+ORDER BY hive_name
+;
+''';
+ final params = [parameters['sessionid']];
+ final records = await db!.readAll(sql, params: params);
+ if (records == null) {
+ rc = wrongData;
+ } else {
+ final list = <Map<String, dynamic>>[];
+ records.forEach((record) => list.add({
+ 'hiveid': record['hive_id'],
+ 'name': record['hive_name'],
+ 'lat': record['hive_latitude'],
+ 'long': record['hive_longitude']
+ }));
+ rc = convert.jsonEncode({'list': list});
+ }
+ return rc;
+ }
+
+ /// Bearbeitet die Anforderung 'sessionid':
+ /// Prüfen der Lizenz ("Token"). Wenn erfolgreich, wird die Session
+ /// in die DB eingetragen.
+ /// Liefert eine JSon-Map (als String) oder null, wenn Fehler.
+ Future<String?> getSessionId(Map<String, dynamic> parameters) async {
+ String? rc;
+ var sql = '''SELECT apiarist_id
+FROM apiarists
+WHERE apiarist_token=?;
+''';
+ final token = parameters['token'];
+ final params = [token];
+ final apiaristId = await db!.readOneInt(sql, params: params);
+ if (apiaristId == null) {
+ logger.log('unknown token: $token', LEVEL_DETAIL);
+ } else {
+ sql = '''INSERT
+ INTO connections
+ (connection_name, connection_apiaristid, connection_start, connection_end, connection_requests)
+ VALUES (?, ?, NOW(), NOW(), 0);
+ ''';
+
+ /// Use only 31 bit (non negativ numbers on 32 bit clients):
+ final sessionId = int.parse(
+ buildMd5Hash(
+ token + DateTime.now().microsecondsSinceEpoch.toString())
+ .substring(0, 8),
+ radix: 16) &
+ 0x7fffffff;
+ final params2 = [sessionId, apiaristId];
+ final id = await db!.insertOne(sql, params: params2);
+ if (id <= 0) {
+ logger.error('insert into connection failed: $token');
+ } else {
+ logger.log('sessionid: $sessionId', LEVEL_DETAIL);
+ rc = convert.jsonEncode({'sessionid': sessionId});
+ }
+ }
+ return rc;
+ }
+
+ /// Liefert zu einem [name] das SQL-Statement.
+ /// [paramCount] ist die Anzahl der Parameter. Dient zur Konsistenzprüfung.
+ /// Liefert null oder das SQL-Statement
+ String? getSql(String name, int paramCount) {
+ String? sql;
+ var expectedCount = 0;
+ switch (name) {
+ case 'hives':
+ sql = '''SELECT
+ *
+FROM hives
+WHERE hive_apiaryid=:id
+''';
+ expectedCount = 1;
+ break;
+ case 'apiaristByToken':
+ sql = '''SELECT
+ *
+FROM apiary
+WHERE
+ apiarist_token=:token
+''';
+ expectedCount = 1;
+ break;
+ default:
+ logger.error('unknown SQL name: $name');
+ break;
+ }
+ if (expectedCount != paramCount) {
+ logger.error(
+ 'unexpected parameter count in $name: $paramCount instead of $expectedCount');
+ }
+ return sql;
+ }
+
+ /// Bearbeitet die Anforderung 'register':
+ /// Test, ob der Name in den Parametern bei den Imkern existiert.
+ /// Wenn ja, wird der Token geliefert, sonst null.
+ Future<String> getToken(Map<String, dynamic> parameters) async {
+ String rc;
+ final sql = '''SELECT apiarist_token
+FROM apiarists
+WHERE apiarist_registername=?;
+''';
+ final name = parameters['name'];
+ final params = [name];
+ final token = await db!.readOneString(sql, params: params);
+ if (token == null) {
+ logger.log('unknown name: $name', LEVEL_DETAIL);
+ rc = wrongData;
+ } else {
+ rc = convert.jsonEncode({'token': token});
+ }
+ return rc;
+ }
+
+ /// Handles a POST request.
+ /// Extracts the request name and calls the matching method.
+ /// Returns the answer that should be sent to the client: A string or a map.
+ Future<String?> handlePost() async {
+ final parameters = stripDelimiters(await contentAsMap());
+ var withSession = false;
+ String? rc;
+ if (what != 'watchdog') {
+ await checkConnection();
+ }
+ try {
+ if (withSession && what != 'sessionid' &&
+ what != 'register' &&
+ what != 'watchdog' &&
+ !await checkSession(parameters)) {
+ rc = forbidden;
+ } else {
+ switch (what) {
+ case 'watchdog':
+ rc = await watchdog(parameters);
+ break;
+ case 'sessionid':
+ rc = await getSessionId(parameters);
+ break;
+ case 'store':
+ rc = await storeData(parameters);
+ break;
+ case 'query':
+ rc = await queryData(parameters);
+ break;
+ case 'register':
+ rc = await getToken(parameters);
+ break;
+ default:
+ logger.error('unknown request: $what');
+ break;
+ }
+ }
+ } on Exception catch (exc) {
+ logger.error('exception thrown: $exc');
+ }
+ return rc;
+ }
+
+ /// Waits for requests of the [server] and calls the handlers.
+ Future handleRequest(HttpRequest request) async {
+ currentRequest = request;
+ try {
+ switch (request.method) {
+ case 'POST':
+ prepareResource();
+ final answer = await handlePost();
+ await writeAnswer(answer, request);
+ break;
+ default:
+ logger.log('unexpected METHOD: ${request.method}', LEVEL_FINE);
+ break;
+ }
+ } on Error catch (exc2, trace) {
+ logger.error('$exc2', stackTrace: trace);
+ } on Exception catch (exc, trace) {
+ logger.error('$exc', stackTrace: trace);
+ }
+ }
+
+ /// Does asynchronous initializations.
+ /// @precondition: Must be called after the constructor!
+ Future initAsync() async {
+ await db!.connect();
+ }
+
+ /// Connects to a listening address/port and waits for requests.
+ Future listen() async {
+ var doService = true;
+ final port = configuration.asInt('port', section: 'service') ?? 58011;
+ final address = configuration.asString('address',
+ section: 'service', defaultValue: '0.0.0.0');
+
+ try {
+ await initAsync();
+ var server = await HttpServer.bind(address, port, shared: true);
+ logger.log('[$threadId]: Listening on $address:$port');
+ var requestNo = 0;
+ if (doService) {
+ await for (HttpRequest request in server) {
+ await handleRequest(request);
+ await request.response.close();
+ if (!doService || maxRequests > 0 && ++requestNo > maxRequests) {
+ break;
+ }
+ }
+ }
+ } on Exception catch (exc) {
+ logger.error(exc.toString());
+ }
+ logger.log('thread $threadId stopped');
+ }
+
+ /// Prüft, ob ein Isolate auf eine Watchdog-Anfrage antwortet.
+ /// Hintergrund: Wenn eine DB-Verbindung abbricht, tritt beim nächsten
+ /// DB-Zugriff eine SocketException auf, die nicht abgefangen werden kann.
+ /// Der Thread (Isolate) ist dann tot.
+ /// Daher wird vom Observer-Thread regelmäßig eine Anfrage verschickt.
+ /// Kommt keine Antwort, ist kein Empfangsthread mehr erreichbar, das Programm
+ /// wird mittels exit() abgebrochen, damit es von SystemD erneut gestartet
+ /// wird.
+ Future observe() async {
+ final duration = Duration(
+ seconds:
+ configuration.asInt('watchDogPause', section: 'service') ?? 60);
+ logger.log('watchdog pause: ${duration.inSeconds}', LEVEL_DETAIL);
+ var counter = 0;
+ while (true) {
+ await Future.delayed(duration);
+ final answer = await runRequest('watchdog', body: '{"wait": 1}');
+ if (++counter == 1) {
+ logger.log('first watchdog request: $answer', LEVEL_DETAIL);
+ }
+ if (answer != 'OK') {
+ logger.error('watchdog failed: aborting! [$answer]');
+ // Wir beenden das Programm, damit es von SystemD neu gestartet wird.
+ exit(1);
+ }
+ }
+ }
+
+ /// Ermittelt die Anfragenklasse ("what"), die Teil der URI ist.
+ void prepareResource({bool withId = true}) {
+ if (!(currentRequest?.requestedUri.path.contains('watchdog') ?? false)) {
+ logger.log(currentRequest!.requestedUri.path, LEVEL_FINE);
+ }
+ var list = currentRequest!.requestedUri.path.split('/');
+ if (list.isNotEmpty && list[0].isEmpty) {
+ list = list.sublist(1);
+ }
+ // URI: /<what>/<version>
+ if (list.length < 2) {
+ throw Exception('illegal request URI: ${list.join("/")}');
+ }
+ what = list[0];
+ restVersion = list[1];
+ }
+
+ /// Fordert per POST eine Aktion an.
+ /// [what] legt die Aktion fest.
+ /// [body] ist der Text, der mit der Anfrage mitgeliefert wird.
+ /// [headers] sind HTTP-Headerkomponenten.
+ /// Liefert die Antwort der Anfrage.
+ Future<String> runRequest(String what,
+ {String? body, Map<String, String>? headers}) async {
+ final port = configuration.asInt('port', section: 'service') ?? 58011;
+ var rc = '';
+ final uri =
+ Uri(scheme: 'http', host: 'localhost', port: port, path: '/$what/1.0');
+ http.Response response;
+ // logger.log('request: POST $uri', LEVEL_LOOP);
+ try {
+ response = await http.post(uri, body: body, headers: headers);
+ // logger.log('status: ${response.statusCode}', LEVEL_LOOP);
+ if (response.statusCode != 200) {
+ logger.error('$uri: status: ${response.statusCode}');
+ } else {
+ rc = response.body;
+ }
+ } on SocketException catch (exc) {
+ logger.error('$exc');
+ rc = '+ $exc';
+ } on Exception catch (exc) {
+ logger.error('$exc');
+ }
+ return rc;
+ }
+
+ void setRequestData(String what, String restVersion) {
+ this.what = what;
+ this.restVersion = restVersion;
+ }
+
+ /// Speichert [content] im Verzeichnis [directory] unter dem Namen [node].
+ Future storeFile(String directory, String node, String content) async {
+ if (directory.isEmpty) {
+ logger.error('storeFile: missing data directory');
+ } else {
+ _fileSync!.ensureDirectory(directory);
+ final fn = path.join(directory, node);
+ await File(fn).writeAsString(content);
+ }
+ }
+
+ /// Bearbeitet die Anforderung 'sample': Speichern einer Probe.
+ Future<String> storeData(Map<String, dynamic> parameters) async {
+ String rc;
+ if (!testParam(
+ ['module', 'sql', 'sampleid', 'sessionid'], parameters, 'storeData')) {
+ rc = wrongParameters;
+ } else {
+
+ final sql = '''INSERT
+ INTO samples
+ (sample_code, sample_timestamp, sample_hiveid, sample_uuid, sample_connection,
+ sample_apiaristid, sample_raw, sample_encoding, created, createdby)
+ VALUES (?, ?, ?, ?, ?,
+ (SELECT MAX(connection_apiaristid) FROM connections WHERE connection_name=?),
+ ?, ?, NOW(), 'POLLSAM');
+ ''';
+ final connection = parameters['sessionid'];
+ final params = [
+ parameters['sampleid'],
+ parameters['time'],
+ parameters['hiveid'],
+ parameters['uuid'],
+ connection,
+ connection,
+ parameters.containsKey('raw') ? parameters['raw'] : null,
+ parameters.containsKey('encoding') ? parameters['encoding'] : null,
+ ];
+ final id = await db!.insertOne(sql, params: params);
+ if (id <= 0) {
+ logger.error('insert into samples failed: ${parameters['uuid']}');
+ } else {
+ logger.log('sample: $id', LEVEL_DETAIL);
+ }
+ final hive = parameters['hiveid'].toString();
+ final data = <String, String>{
+ 'uuid': parameters['uuid'],
+ 'time': parameters['time'],
+ 'sampleid': parameters['sampleid'],
+ 'hiveid': hive,
+ };
+ final content = convert.jsonEncode(data);
+ await storeFile(
+ configuration.asString(
+ 'dataDirectory',
+ section: 'service',
+ ) ??
+ '',
+ parameters['uuid'] as String,
+ content);
+ rc = 'OK';
+ }
+ return rc;
+ }
+
+ /// Liefert eine Kopie von [source] mit Schlüsseln und Werten ohne Delimiter.
+ /// Beispiel: { "'a'": '"1"' } => { "a": '1' }
+ Map<String, dynamic> stripDelimiters(Map<String, dynamic> source) {
+ final rc = <String, dynamic>{};
+ for (var key in source.keys) {
+ var value = source[key];
+ final valueLength = value is String ? value.length : 0;
+ final value2 = valueLength == 0 ? null : (value as String);
+ if (valueLength > 0 &&
+ value2 != null &&
+ (value2.startsWith('"') || value2.startsWith("'")) &&
+ value2.endsWith(value2[0])) {
+ value = value2.substring(1, value2.length - 1);
+ }
+ if ((key.startsWith('"') || key.startsWith("'")) &&
+ key.length > 2 &&
+ key.endsWith(key[0])) {
+ key = key.substring(1, key.length - 1);
+ }
+ rc[key] = value;
+ }
+ return rc;
+ }
+
+ /// Tests, whether the [map] contains given [keys].
+ /// Return true, if all keys are available.
+ bool testParam(List<String> keys, Map<String, dynamic> map, String caller) {
+ var rc = true;
+ for (var key in keys) {
+ if (!map.containsKey(key)) {
+ logger.error('missing key "$key" in $caller');
+ rc = false;
+ }
+ }
+ return rc;
+ }
+
+ /// Returns 'OK' after a given timeout.
+ /// The timeout is used to request other instances (isolates) (listening on
+ /// shared ports).
+ /// Returns 'OK' after a given timeout.
+ Future<String> watchdog(Map<String, dynamic> parameters) async {
+ final rc = 'OK';
+ final wait = parameters['wait'];
+ await Future.delayed(Duration(seconds: wait), () => '1');
+ return rc;
+ }
+
+ /// Writes an [answer] to the client via [request].
+ Future writeAnswer(answer, request) async {
+ if (answer != null) {
+ if (answer is Iterable || answer is Map) {
+ answer = convert.jsonEncode(answer);
+ }
+ if (logger.logLevel >= LEVEL_FINE &&
+ !request.requestedUri.path.contains('watchdog')) {
+ logger.log('[$threadId]: answer: ' +
+ limitString(
+ answer, configuration.asInt('answerLength', section: 'trace')));
+ }
+ // Preventing "XMLHttpRequest error"
+ // do not use with credentials!
+ request.response.headers.add('Access-Control-Allow-Origin', '*');
+ request.response
+ ..statusCode = HttpStatus.ok
+ ..write(answer);
+ await request.response.close();
+ }
+ }
+}
+
+/// Data class for storage of configuration data of a [ServiceWorker].
+class WorkerParameters {
+ int id;
+ BaseConfiguration configuration;
+ String serviceName;
+
+ WorkerParameters(this.id, this.configuration, this.serviceName);
+
+ /// Decodes a string built with asString() into an instance.
+ factory WorkerParameters.fromString(String data) {
+ final parts = data.split('\t');
+ const baseService = 2;
+ const baseTrace = baseService + 4;
+ const baseDb = baseTrace + 1;
+ const baseRest = baseDb + 7;
+ final configuration = BaseConfiguration({
+ 'service': {
+ 'address': parts[baseService],
+ 'port': parts[baseService + 1],
+ 'watchDogPause': parts[baseService + 2],
+ 'dataDirectory': parts[baseService + 3]
+ },
+ 'trace': {
+ 'answerLength': int.parse(parts[baseTrace]),
+ },
+ 'db': {
+ 'db': parts[baseDb],
+ 'user': parts[baseDb + 1],
+ 'code': parts[baseDb + 2],
+ 'host': parts[baseDb + 3],
+ 'port': int.parse(parts[baseDb + 4]),
+ 'timeout': int.parse(parts[baseDb + 5]),
+ 'traceDataLength': int.parse(parts[baseDb + 6]),
+ },
+ 'clientSessionTimeout': int.parse(parts[baseRest]),
+ }, MemoryLogger());
+ return WorkerParameters(
+ int.parse(parts[1]), configuration, parts[baseRest + 1]);
+ }
+
+ /// Encodes the relevant instance data into a string.
+ /// Decode that with fromString().
+ String asString() {
+ const section = 'service';
+ const section2 = 'trace';
+ const section3 = 'db';
+ final rc = <String>[
+ 'WorkerParameters',
+ id.toString(),
+ configuration.asString('address', section: section) ?? 'localhost',
+ (configuration.asInt('port', section: section) ?? 58011).toString(),
+ (configuration.asInt('watchDogPause', section: section) ?? 60).toString(),
+ configuration.asString('dataDirectory', section: section) ?? 'data',
+ configuration
+ .asInt('answerLength', section: section2, defaultValue: 200)
+ .toString(),
+ configuration.asString('db', section: section3) ?? 'apppolladm',
+ configuration.asString('user', section: section3) ?? 'apppolladm',
+ configuration.asString('code', section: section3) ?? 'TopSecret',
+ configuration.asString('host', section: section3) ?? 'localhost',
+ configuration
+ .asInt('port', section: section3, defaultValue: 3306)
+ .toString(),
+ configuration
+ .asInt('timeout', section: section3, defaultValue: 30)
+ .toString(),
+ configuration
+ .asInt('traceDataLength', section: section3, defaultValue: 3306)
+ .toString(),
+ configuration.asInt('clientSessionTimeout', defaultValue: 900).toString(),
+ serviceName,
+ ].join('\t');
+ return rc;
+ }
+}
+enum SqlStatementType {
+ insert, list, query, update, delete
+}
+class SqlException extends FormatException{
+ final String data;
+ SqlException(this.data);
+ @override
+ String toString() => 'SqlException: $data';
+}
+class SqlStatement {
+ final String name;
+ final List<String> parameters;
+ final String sql;
+ final SqlStatementType type;
+ final SqlModule parent;
+ SqlStatement(this.name, this.parameters, this.sql, this.type, this.parent);
+ /// Returns a SQL statement and a parameter list (positional parameters).
+ /// [map]: the current parameters as named parameters: <name>: <value>
+ /// [parameters]: OUT the positional parameters
+ String sqlStatement(Map map, List<String> parameters){
+ for (var parameter in parameters){
+ if (! map.containsKey(parameter)) {
+ throw SqlException('${toString()}: missing parameter "$parameter"');
+ } else {
+ parameters.add(map[parameter]);
+ }
+ }
+ return sql;
+ }
+ @override
+ String toString(){
+ String rc = '${parent.name}.$name';
+ return rc;
+ }
+}
+class SqlModule{
+ final String name;
+ final Map<String, SqlStatement> sqlStatements = {};
+ final SqlStorage parent;
+ SqlModule(this.name, this.parent);
+ /// Adds a statement to the map.
+ void add(SqlStatement statement) {
+ if (sqlStatements.containsKey(statement.name)){
+ throw SqlException('module $name contains already a statement "${statement.name}"');
+ }
+ sqlStatements[statement.name] = statement;
+ }
+}
+class SqlStorage{
+ final BaseLogger logger;
+ Map<String, SqlModule> modules = {};
+ SqlStorage(this.logger);
+ void readModule(Map map, String filename){
+ String moduleName = '<unknown>';
+ if (map.containsKey('module')){
+ moduleName = map['module'];
+ } else {
+ logger.error('$filename: missing "module"');
+ }
+ if (! modules.containsKey(moduleName)){
+ modules[moduleName] = SqlModule(moduleName, this);
+ }
+ final module = modules[moduleName];
+ for (var name in map.keys){
+ switch(name){
+ case 'module':
+ // already done.
+ break;
+ default:
+ final map2 = map[name];
+ if (map2 is! Map){
+ logger.error('$filename: "$name" is not a map');
+ } else if (! map2.containsKey('type')){
+ logger.error('$filename: "$name": missing type');
+ } else if (! map2.containsKey('parameters')){
+ logger.error('$filename: "$name": missing parameters');
+ } else if (! map2.containsKey('sql')){
+ logger.error('$filename: "$name": missing sql');
+ }
+ final type = map2['type'];
+ final parameters = map2['parameters'];
+ final sql = map2['sql'];
+ if (type is! String){
+ logger.error('$filename: "$name": type is not a string');
+ } else if (parameters is! Iterable){
+ logger.error('$filename: "$name": type is not an array');
+ } else if (sql is! String){
+ logger.error('$filename: "$name": type is not a string');
+ } else {
+ final parameters2 = <String>[];
+ int no = -1;
+ for (var item in parameters){
+ no++;
+ if (item is! String){
+ logger.error('$filename: "$name": parameter[$no] is not a string');
+ } else {
+
+ }
+ }
+ var type2 = stringToEnum(type, SqlStatementType.values);
+ if (type2 == null){
+ logger.error('$filename: "$name": unknown type: $type. Using "query"');
+ type2 = SqlStatementType.query;
+ }
+ modules[moduleName]!.add(SqlStatement(name, parameters2, sql, type2, module!));
+ }
+ }
+ }
+ }
+}