您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12158|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    " a/ A5 h. ]/ u0 ~( ]
  2. require_once __DIR__ . '/Workerman/Autoloader.php';! P; Q9 n5 Y8 H5 w, z0 {- d
  3. 4 x- _! S6 i2 K* W, ]: n+ R
  4. $worker = new Worker();! h- B3 I& l# _) f# c" Z
  5. // 4个进程
    * e( }/ G0 {% D( w/ q) T
  6. $worker->count = 4;. K, D1 \% v; \5 r  Z5 E
  7. // 每个进程启动后在当前进程新增一个Worker监听% h$ J. C9 D) f" G4 L2 c
  8. $worker->onWorkerStart = function($worker)
    - V' W  Q  c- w; T% D
  9. {
    / W8 a% n4 J3 {+ D" }3 y9 d
  10.     /**
    ( z/ f4 b& w) ]9 u$ Y, i- H
  11.      * 4个进程启动的时候都创建2016端口的Worker
    1 [7 R( x* O3 ~& F- `" m; @
  12.      * 当执行到worker->listen()时会报Address already in use错误
    4 l& K/ X2 C2 o+ o/ a: C1 E1 U
  13.      * 如果worker->count=1则不会报错! r+ a1 [/ ~4 U8 c
  14.      */
      s) U1 Z% u; {+ s" L
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ; K! F, V9 h0 G, ]$ n
  16.     $inner_worker->onMessage = 'on_message';2 R4 W" a5 ]5 z9 A; X
  17.     // 执行监听。这里会报Address already in use错误- X! O1 o; X4 E; n* l
  18.     $inner_worker->listen();
    : g/ l6 M" L( I6 G; M
  19. };
    # ^& g# f+ ^+ |3 W8 r
  20. ( ^; ]2 G! ]. W; T4 W: ~% b" I
  21. $worker->onMessage = 'on_message';
    $ ^: e- @; y0 k" V( k! `. D, q) Z
  22. ! `9 g+ w& p/ k9 |" v
  23. function on_message($connection, $data)
    7 w7 \: c: r7 q# f. L3 Q: B2 l
  24. {
    / R+ F( }& ~! s, r4 g1 q7 S; H1 W+ ?
  25.     $connection->send("hello\n");
    ; ?* E* t% p) k4 h( S: ^! \& x
  26. }
    " V9 S, g, F& G0 f6 ?
  27. # Z' n6 j; b% {  s1 ~" g6 c4 M
  28. // 运行worker
    $ m  G! p( U$ t$ O' t7 R
  29. Worker::runAll();% |* R7 ?' f- i" a0 V
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    9 A8 P, X( E: S  c
  31. . J5 U9 ]. @1 d8 k1 e
  32. use Workerman\Worker;
    ! L1 i' k* `: f$ ^
  33. require_once './Workerman/Autoloader.php';. J+ ^/ r- v2 F; @# S

  34. ' N7 F% K2 E" k/ k8 l
  35. $worker = new Worker('text://0.0.0.0:2015');5 p% I" m1 A( v+ V% W3 _5 E
  36. // 4个进程
    5 h. X$ q8 a% A
  37. $worker->count = 4;
    0 n: R2 ?# L, _( i  Z% O% j( ~) x; d, q7 @
  38. // 每个进程启动后在当前进程新增一个Worker监听
    " P! A0 ~- S5 \  s' b$ s6 a
  39. $worker->onWorkerStart = function($worker)
    ( \% h1 N1 v! X' D) h
  40. {3 L4 @, W: ?& k& l+ E
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');" `9 b8 }1 \9 K( Z% y
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& {" w3 Q* s6 h$ j, p! Q; `' k
  43.     $inner_worker->reusePort = true;
    0 s7 G. a* D1 T' e' ^+ ~+ z
  44.     $inner_worker->onMessage = 'on_message';
    % }6 k3 d7 o5 A
  45.     // 执行监听。正常监听不会报错
    ) r9 V- J* K7 Z/ _9 Z/ b9 \
  46.     $inner_worker->listen();& V! ^( d" m2 r% p
  47. };
    - J9 Z0 [3 a4 o& B" n  L
  48. * y( o* w% F7 C+ [1 m2 U
  49. $worker->onMessage = 'on_message';! o4 D' ]6 D7 f4 |* p7 p

  50. * @* L; @# e" L2 C
  51. function on_message($connection, $data)3 N, J, \$ ^2 r6 Z
  52. {
    / x. e, D2 y( y2 v  |/ n3 s( l1 F
  53.     $connection->send("hello\n");
    , S4 C7 L  Q: p6 y8 I& b
  54. }$ o( q, E5 x2 s" s
  55. # ~5 H$ B) ]7 k" q- F0 y
  56. // 运行worker
    + G& v7 G' T5 l! B
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    7 N# U- q) g9 f) c8 O( ~7 P
  2. use Workerman\Worker;
    . v) I4 q+ v$ d$ ^: f% C  t
  3. require_once './Workerman/Autoloader.php';# @0 }+ k: r7 S; Y
  4. // 初始化一个worker容器,监听1234端口
    / u! Z3 @. e: E( ?: ?. V( u
  5. $worker = new Worker('websocket://0.0.0.0:1234');3 T0 a7 j* n( i- [4 S) s8 G* g5 Q
  6. ' }  m6 F$ S) F
  7. /*
    0 ?, P* T1 a: y* T) J, k6 }1 l
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误1 n# [2 o- k& g: Y3 Z4 Z3 }( c: `
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)( q; q! Z) l) X
  10. */
    7 J/ Q6 v: K, e3 h
  11. $worker->count = 1;. A( d8 ?' k( `$ C/ k" f
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口6 O6 p+ l5 _* I  u3 X! U  k& S% X( Z
  13. $worker->onWorkerStart = function($worker); c! i! S+ D! b; X( j
  14. {" T: n3 E2 l8 @0 e
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符, d+ {! t( v" d! b
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    1 Y. S( ?& h. s- I' J  A1 W. t
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    ' X. S, d- F7 d
  18.     {
    + i9 U" \5 m! ~+ K2 D
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    / o, G1 r3 k+ v3 o* t
  20.         $data = json_decode($buffer, true);3 E' }: {0 B" P8 @; d
  21.         $uid = $data['uid'];
    * b6 n6 n' A7 w$ r! E6 o
  22.         // 通过workerman,向uid的页面推送数据/ K. e4 x* z9 U8 @- c( z& K
  23.         $ret = sendMessageByUid($uid, $buffer);7 `2 u/ M  b. b$ K3 o7 j
  24.         // 返回推送结果
      |' s& N$ s7 ?' }
  25.         $connection->send($ret ? 'ok' : 'fail');8 Q" H) l( l2 q' y- \: I. o
  26.     };
    & v  t2 T+ n! T
  27.     // ## 执行监听 ##% ~1 c$ J7 i- [4 ?4 u" e1 c2 S
  28.     $inner_text_worker->listen();% N# ~5 ]" |$ B' ~
  29. };
    5 s9 |- Z- I: G$ K7 v% J
  30. // 新增加一个属性,用来保存uid到connection的映射
    5 v/ w6 s* V* P8 Z  R# w# |0 {0 o
  31. $worker->uidConnections = array();. U1 ^, W; W# h% D# F& W; S
  32. // 当有客户端发来消息时执行的回调函数4 n3 I: E8 n$ r5 g: n
  33. $worker->onMessage = function($connection, $data)
    0 I+ J% ]# k! A7 }6 H
  34. {
    1 \! ~8 ^6 H0 d9 b+ a0 V0 z7 h
  35.     global $worker;; O# r+ U4 o6 @% R: L  [4 n
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    8 R2 j# w' A* h' c3 i/ H0 X
  37.     if(!isset($connection->uid))% T/ E2 g2 H! \
  38.     {
    % B) d; p6 v! H0 c
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)/ m+ N9 q; s7 L' P! M& ]
  40.        $connection->uid = $data;$ H4 R. p) b7 |, G
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,! \3 b$ L; A; J, t: T/ T( o
  42.         * 实现针对特定uid推送数据
    + |* ^& c. Z% q2 h' d* c( \
  43.         */, j$ H1 c% r9 H1 q
  44.        $worker->uidConnections[$connection->uid] = $connection;
    + Z( E% m8 ]! F4 |8 H0 B" e9 I
  45.        return;
    ( s% q0 q) b% P: y0 |% G' w2 C
  46.     }
    0 E8 S0 h* N. S# ]0 }% X1 k
  47. };
    ( _4 t/ u6 r+ I6 k9 f! _: Y8 f0 g, Z
  48. ! g* {+ d: P. W: p. {0 f
  49. // 当有客户端连接断开时! Z; D5 J( q0 p9 ^9 R% f7 R
  50. $worker->onClose = function($connection)0 S3 L% T* v) S: D3 Q& G5 ~- c$ x/ E
  51. {
    + a+ L) n8 g. ]: h3 q' `' z, K
  52.     global $worker;
    " Y, ?9 l" c* |7 r6 \
  53.     if(isset($connection->uid))
    ' C* T7 L! h" _2 G
  54.     {; ?' r# {" A" B$ N- m& r5 T$ k% w
  55.         // 连接断开时删除映射9 T0 ~4 K; w* C# b( P1 L, M
  56.         unset($worker->uidConnections[$connection->uid]);+ l# y" K$ V7 q$ c; G+ F
  57.     }
    $ c( ^5 P+ q* G- p; ^6 Q
  58. };
    - Q6 l# D8 E- L0 ?2 k
  59. . {; N7 V) y$ r# E
  60. // 向所有验证的用户推送数据
    + Q/ d: X3 d6 s5 P& p" g
  61. function broadcast($message)# Q% U- Y$ _( n: e4 I: b
  62. {
    ) A+ ?- j* j3 f0 h: Q- X" Q
  63.    global $worker;5 w' S1 n8 F6 T$ O) w* o2 e: w! z
  64.    foreach($worker->uidConnections as $connection)2 _4 d, j0 q- b( M; x
  65.    {5 e; L- t% w1 i! G' ]  T
  66.         $connection->send($message);
    : }' z8 e/ _2 i" G% f1 q
  67.    }
    5 O* E3 z2 m. G1 @2 p" D
  68. }
    , Q* K9 Y1 J/ f4 E

  69.   ^( r' Z& C8 ^3 V$ e) [
  70. // 针对uid推送数据
    . O6 W, K7 ^6 J9 R( g  y
  71. function sendMessageByUid($uid, $message); r$ y; g+ L1 y5 p/ t! M" r0 D0 n
  72. {* ~( y# T/ }' s1 H8 p, N
  73.     global $worker;
    # P5 Z' U& F4 Z. b% U
  74.     if(isset($worker->uidConnections[$uid]))
    9 N. I. z; g' a1 ~5 f# X
  75.     {
    " R5 T( B% U5 V2 q5 _7 h& ?& k- U$ d
  76.         $connection = $worker->uidConnections[$uid];
    1 w7 N% Z# `- h3 i& S7 r
  77.         $connection->send($message);
    . t6 [5 x$ v; z2 j
  78.         return true;' k  w0 I' p+ V6 k# m! F% }- m
  79.     }$ @9 d& L. b$ b! p0 R9 Y& {
  80.     return false;9 x- i6 p' o7 @' k) k& K; X
  81. }
    $ S  v! X4 v# v6 V  `+ W  w5 P0 |1 m

  82.   P) p$ u  d, T. ^7 q/ y
  83. // 运行所有的worker. Z* V7 C1 d  L" i% h( k
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');) [$ t. m. }3 J4 q+ @
  2. ws.onopen = function(){: X6 y  w, A% K, {; H" \3 D+ K
  3.     var uid = 'uid1';1 [/ I! x2 |6 H- ?, W
  4.     ws.send(uid);6 @8 w. {& c0 u! _4 F$ P% F
  5. };
    + T7 }, ^: j$ G( W7 ^
  6. ws.onmessage = function(e){
    6 i3 K0 x0 x/ j) Y
  7.     alert(e.data);
    ; m* l: Z: j, z; D
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    , A1 r5 j: n. b3 ~' s7 o
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);& S- Q1 z+ L0 n% ^. `1 M* T
  3. // 推送的数据,包含uid字段,表示是给这个uid推送! g# ^* \% `7 X, H9 k2 `
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');& ?) a4 Y. a" g3 y( G6 i; G
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符, r/ M7 F, \5 k
  6. fwrite($client, json_encode($data)."\n");
    ( D) `: f6 {3 y8 l  @: p
  7. // 读取推送结果
    ( A$ n5 h: M' h% f# U5 C5 ?/ W% P
  8. echo fread($client, 8192);
复制代码

$ H1 d& M0 J4 V; q* Q
" E+ T1 G4 ]# f5 t8 C
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 16:43 , Processed in 0.103337 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!