...
 
Commits (4)
# Generated from CLion C/C++ Code Style settings
BasedOnStyle: LLVM BasedOnStyle: LLVM
Language: Cpp Language: Cpp
DerivePointerAlignment: true DerivePointerAlignment: true
......
...@@ -42,19 +42,96 @@ namespace pEp { ...@@ -42,19 +42,96 @@ namespace pEp {
} }
namespace Adapter { namespace Adapter {
// private
SyncModes _sync_mode = SyncModes::Async;
::messageToSend_t _messageToSend = nullptr; ::messageToSend_t _messageToSend = nullptr;
::notifyHandshake_t _notifyHandshake = nullptr; ::notifyHandshake_t _notifyHandshake = nullptr;
bool _adapter_manages_sync_thread = false;
::inject_sync_event_t _inject_action = _queue_sync_event;
std::thread _sync_thread; std::thread _sync_thread;
::utility::locked_queue<SYNC_EVENT, ::free_Sync_event> sync_evt_q; ::utility::locked_queue<SYNC_EVENT, ::free_Sync_event> sync_evt_q;
std::mutex m; std::mutex mut;
// private
std::thread::id sync_thread_id() std::thread::id sync_thread_id()
{ {
return _sync_thread.get_id(); return _sync_thread.get_id();
} }
int _inject_sync_event(::SYNC_EVENT ev, void *management) // public
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread)
{
_messageToSend = messageToSend;
_notifyHandshake = notifyHandshake;
_adapter_manages_sync_thread = adapter_manages_sync_thread;
set_sync_mode(mode);
return;
}
// public
void set_sync_mode(SyncModes mode)
{
// std::lock_guard<mutex> lock(mut);
_sync_mode = mode;
if (_sync_mode == SyncModes::Sync) {
// init sesssion with inject_sync = process
// stop sync
session(release);
_inject_action = _process_sync_event;
session(init);
::register_sync_callbacks(session(), nullptr, _notifyHandshake, _retrieve_next_sync_event);
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// The adapter need to shutdown sync thread
}
}
if (_sync_mode == SyncModes::Async) {
// init session with inject_sync = queue
// start sync thread
session(release);
_inject_action = _queue_sync_event;
session(init);
if(!_adapter_manages_sync_thread) {
if (!is_sync_running()) {
startup<void>(_messageToSend, _notifyHandshake, nullptr, nullptr);
}
} else {
// The adapter need to do sync thread start up
}
}
if (_sync_mode == SyncModes::Off) {
// init sesssion with inject_sync = null
// stop sync thread
if(!_adapter_manages_sync_thread) {
shutdown();
} else {
// Adapter needs to shutdown sync thread
}
session(release);
_inject_action = _queue_sync_event;
session(init);
}
return;
}
// private
int _process_sync_event(::SYNC_EVENT ev, void *management)
{
if (ev != nullptr) {
::do_sync_protocol_step(session(), nullptr, ev);
return 0;
} else {
return 0;
}
}
// private
int _queue_sync_event(::SYNC_EVENT ev, void *management)
{ {
try { try {
if (ev == nullptr) { if (ev == nullptr) {
...@@ -69,12 +146,13 @@ namespace pEp { ...@@ -69,12 +146,13 @@ namespace pEp {
return 0; return 0;
} }
// private
PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr) PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr)
{ {
return passphrase_cache.ensure_passphrase(session, fpr); return passphrase_cache.ensure_passphrase(session, fpr);
} }
// threshold: max waiting time in seconds // public
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold) ::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold)
{ {
::SYNC_EVENT syncEvent = nullptr; ::SYNC_EVENT syncEvent = nullptr;
...@@ -87,14 +165,16 @@ namespace pEp { ...@@ -87,14 +165,16 @@ namespace pEp {
return syncEvent; return syncEvent;
} }
// public
bool on_sync_thread() bool on_sync_thread()
{ {
return _sync_thread.get_id() == this_thread::get_id(); return _sync_thread.get_id() == this_thread::get_id();
} }
// public
::PEP_SESSION Session::operator()(session_action action) ::PEP_SESSION Session::operator()(session_action action)
{ {
std::lock_guard<mutex> lock(m); std::lock_guard<mutex> lock(mut);
::PEP_STATUS status = ::PEP_STATUS_OK; ::PEP_STATUS status = ::PEP_STATUS_OK;
...@@ -108,12 +188,11 @@ namespace pEp { ...@@ -108,12 +188,11 @@ namespace pEp {
case init: case init:
if (!_session.get()) { if (!_session.get()) {
::PEP_SESSION session_; ::PEP_SESSION session_;
status = ::init(&session_, _messageToSend, _inject_sync_event, _ensure_passphrase); status = ::init(&session_, _messageToSend, _inject_action, _ensure_passphrase);
throw_status(status); throw_status(status);
_session = SessionPtr{session_, ::release}; _session = SessionPtr{session_, ::release};
} }
break; break;
default: default:
status = ::PEP_ILLEGAL_VALUE; status = ::PEP_ILLEGAL_VALUE;
} }
...@@ -122,21 +201,29 @@ namespace pEp { ...@@ -122,21 +201,29 @@ namespace pEp {
return _session.get(); return _session.get();
} }
// public
void shutdown() void shutdown()
{ {
pEpLog("called"); pEpLog("called");
if (_sync_thread.joinable()) { if (_sync_thread.joinable()) {
pEpLog("sync_is_running - injecting null event"); pEpLog("sync_is_running - injecting null event");
_inject_sync_event(nullptr, nullptr); _queue_sync_event(nullptr, nullptr);
_sync_thread.join(); _sync_thread.join();
} }
} }
// public
bool is_sync_running() bool is_sync_running()
{ {
return _sync_thread.joinable(); if(!_adapter_manages_sync_thread) {
return _sync_thread.joinable();
} else {
return false;
}
} }
// public
// Works even if adapter is managing sync thread, BUT must be using this queue
bool in_shutdown() bool in_shutdown()
{ {
SYNC_EVENT ev; SYNC_EVENT ev;
......
...@@ -26,7 +26,25 @@ namespace pEp { ...@@ -26,7 +26,25 @@ namespace pEp {
}; };
namespace Adapter { namespace Adapter {
int _inject_sync_event(::SYNC_EVENT ev, void *management); // public
enum class SyncModes
{
Off,
Sync,
Async
};
void sync_initialize(
SyncModes mode,
::messageToSend_t messageToSend,
::notifyHandshake_t notifyHandshake,
bool adapter_manages_sync_thread);
void set_sync_mode(SyncModes mode);
int _queue_sync_event(::SYNC_EVENT ev, void *management);
int _process_sync_event(::SYNC_EVENT ev, void *management);
::PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr); ::PEP_STATUS _ensure_passphrase(::PEP_SESSION session, const char *fpr);
template<class T = void> template<class T = void>
...@@ -46,7 +64,7 @@ namespace pEp { ...@@ -46,7 +64,7 @@ namespace pEp {
enum session_action enum session_action
{ {
init, init,
release release,
}; };
class Session { class Session {
......
...@@ -19,13 +19,24 @@ namespace pEp { ...@@ -19,13 +19,24 @@ namespace pEp {
extern std::thread _sync_thread; extern std::thread _sync_thread;
extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q; extern ::utility::locked_queue<::SYNC_EVENT, ::free_Sync_event> sync_evt_q;
extern std::mutex m; extern std::mutex mut;
::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold); ::SYNC_EVENT _retrieve_next_sync_event(void *management, unsigned threshold);
static std::exception_ptr _ex; static std::exception_ptr _ex;
static std::atomic_bool register_done{false}; static std::atomic_bool register_done{false};
/*
* Sync Thread
* 1. Execute registered startup function
* 2. Create session for the sync thread (registers: messageToSend, inject_sync_event, ensure_passphrase)
* 3. register_sync_callbacks() (registers: _notifyHandshake, _retrieve_next_sync_event)
* 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
* 5. unregister_sync_callbacks()
* 6. Release the session
* 7. Execute registered shutdown function
*/
// private
template<class T> template<class T>
void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown) void sync_thread(T *obj, function<void(T *)> _startup, function<void(T *)> _shutdown)
{ {
...@@ -33,13 +44,17 @@ namespace pEp { ...@@ -33,13 +44,17 @@ namespace pEp {
_ex = nullptr; _ex = nullptr;
assert(_messageToSend); assert(_messageToSend);
assert(_notifyHandshake); assert(_notifyHandshake);
// 1. Execute registered startup function
if (obj && _startup) { if (obj && _startup) {
_startup(obj); _startup(obj);
} }
pEpLog("creating session"); pEpLog("creating session for the sync thread");
// 2. Create session for the sync thread
session(); session();
// 3. register_sync_callbacks()
{ {
// TODO: Do we need to use a passphraseWrap here??? // TODO: Do we need to use a passphraseWrap here???
pEpLog("register_sync_callbacks()"); pEpLog("register_sync_callbacks()");
...@@ -50,6 +65,8 @@ namespace pEp { ...@@ -50,6 +65,8 @@ namespace pEp {
_retrieve_next_sync_event); _retrieve_next_sync_event);
pEpLog("register_sync_callbacks() return:" << status); pEpLog("register_sync_callbacks() return:" << status);
// Convert status into exception and store it
// set register_done AFTER that
try { try {
throw_status(status); throw_status(status);
register_done.store(true); register_done.store(true);
...@@ -61,21 +78,35 @@ namespace pEp { ...@@ -61,21 +78,35 @@ namespace pEp {
} }
pEpLog("sync protocol loop started"); pEpLog("sync protocol loop started");
// 4. Enter Sync Event Dispatching Loop (do_sync_protocol())
::do_sync_protocol(session(), (void *)obj); ::do_sync_protocol(session(), (void *)obj);
pEpLog("sync protocol loop ended"); pEpLog("sync protocol loop ended");
// 5. unregister_sync_callbacks()
unregister_sync_callbacks(session()); unregister_sync_callbacks(session());
// 6. Release the session
// TODO: Maybe do that AFTER shutdown?
session(release); session(release);
// 7. Execute registered shutdown function
if (obj && _shutdown) { if (obj && _shutdown) {
_shutdown(obj); _shutdown(obj);
} }
} }
/*
* Sync Thread Startup
* 1. ensure session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase)
* 2. Start the sync thread
* 3. Defer execution until sync thread register_sync_callbacks() has returned
* 4. Throw pending exception from the sync thread
*/
// private
template<class T> template<class T>
void startup( void startup(
messageToSend_t messageToSend, ::messageToSend_t messageToSend,
notifyHandshake_t notifyHandshake, ::notifyHandshake_t notifyHandshake,
T *obj, T *obj,
function<void(T *)> _startup, function<void(T *)> _startup,
function<void(T *)> _shutdown) function<void(T *)> _shutdown)
...@@ -88,19 +119,23 @@ namespace pEp { ...@@ -88,19 +119,23 @@ namespace pEp {
if (notifyHandshake) { if (notifyHandshake) {
_notifyHandshake = notifyHandshake; _notifyHandshake = notifyHandshake;
} }
pEpLog("creating session"); pEpLog("ensure session for the main thread");
session(); // 1. re-initialize session for the main thread (registers: messageToSend, _queue_sync_event, _ensure_passphrase)
session(release);
session(init);
if (!_sync_thread.joinable()) { if (!_sync_thread.joinable()) {
register_done.store(false); register_done.store(false);
pEpLog("creating sync-thread"); pEpLog("creating sync-thread");
// 2. Start the sync thread
_sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown); _sync_thread = std::thread(sync_thread<T>, obj, _startup, _shutdown);
// 3. Defer execution until sync thread register_sync_callbacks() has returned
while (register_done.load() == false) { while (register_done.load() == false) {
pEpLog("waiting for sync-thread to init..."); pEpLog("waiting for sync-thread to init...");
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
// 4. Throw pending exception from the sync thread
if (_ex) { if (_ex) {
pEpLog("exception pending, rethrowing"); pEpLog("exception pending, rethrowing");
std::rethrow_exception(_ex); std::rethrow_exception(_ex);
...@@ -110,4 +145,4 @@ namespace pEp { ...@@ -110,4 +145,4 @@ namespace pEp {
} // namespace Adapter } // namespace Adapter
} // namespace pEp } // namespace pEp
#endif // LIBPEPADAPTER_ADAPTER_HXX #endif //LIBPEPADAPTER_ADAPTER_HXX
\ No newline at end of file \ No newline at end of file